Merge pull request #8125 from influxdata/cn+savage/ingester-partition-pruning

feat: partition pruning for ingester query responses
pull/24376/head
Dom 2023-07-03 16:38:07 +01:00 committed by GitHub
commit 1acbf4a20d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 893 additions and 288 deletions

1
Cargo.lock generated
View File

@ -2628,6 +2628,7 @@ dependencies = [
"parquet_file",
"paste",
"pin-project",
"predicate",
"prost",
"rand",
"schema",

View File

@ -1323,10 +1323,15 @@ async fn assert_ingester_contains_results(
.await
.unwrap();
let ingester_uuid = ingester_response.app_metadata.ingester_uuid;
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("at least one ingester partition");
let ingester_uuid = ingester_partition.app_metadata.ingester_uuid;
assert!(!ingester_uuid.is_empty());
assert_batches_sorted_eq!(expected, &ingester_response.record_batches);
assert_batches_sorted_eq!(expected, &ingester_partition.record_batches);
}
#[tokio::test]

View File

@ -1,8 +1,14 @@
use arrow::datatypes::DataType;
use arrow_flight::{error::FlightError, Ticket};
use arrow_util::assert_batches_sorted_eq;
use data_types::{NamespaceId, TableId};
use datafusion::{
prelude::{col, lit},
scalar::ScalarValue,
};
use futures::FutureExt;
use http::StatusCode;
use influxdb_iox_client::table::generated_types::{Part, PartitionTemplate, TemplatePart};
use ingester_query_grpc::{influxdata::iox::ingester::v1 as proto, IngesterQueryRequest};
use prost::Message;
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState};
@ -39,7 +45,14 @@ async fn persist_on_demand() {
.await
.unwrap();
let ingester_uuid = ingester_response.app_metadata.ingester_uuid;
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
let ingester_uuid = ingester_partition.app_metadata.ingester_uuid;
assert!(!ingester_uuid.is_empty());
let expected = [
@ -49,7 +62,7 @@ async fn persist_on_demand() {
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches);
}
.boxed()
})),
@ -77,8 +90,15 @@ async fn persist_on_demand() {
.await
.unwrap();
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
let num_files_persisted =
ingester_response.app_metadata.completed_persistence_count;
ingester_partition.app_metadata.completed_persistence_count;
assert_eq!(num_files_persisted, 1);
}
.boxed()
@ -121,11 +141,17 @@ async fn ingester_flight_api() {
.query_ingester(query.clone(), cluster.ingester().ingester_grpc_connection())
.await
.unwrap();
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
let ingester_uuid = ingester_response.app_metadata.ingester_uuid.clone();
let ingester_uuid = ingester_partition.app_metadata.ingester_uuid.clone();
assert!(!ingester_uuid.is_empty());
let schema = ingester_response.schema.unwrap();
let schema = ingester_partition.schema.unwrap();
let expected = [
"+------+------+--------------------------------+-----+",
@ -135,11 +161,11 @@ async fn ingester_flight_api() {
"| B | A | 1970-01-01T00:00:00.001234567Z | 84 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches);
// Also ensure that the schema of the batches matches what is
// reported by the performed_query.
ingester_response
ingester_partition
.record_batches
.iter()
.enumerate()
@ -152,7 +178,13 @@ async fn ingester_flight_api() {
.query_ingester(query.clone(), cluster.ingester().ingester_grpc_connection())
.await
.unwrap();
assert_eq!(ingester_response.app_metadata.ingester_uuid, ingester_uuid);
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
assert_eq!(ingester_partition.app_metadata.ingester_uuid, ingester_uuid);
// Restart the ingesters
cluster.restart_ingesters().await;
@ -167,7 +199,146 @@ async fn ingester_flight_api() {
.query_ingester(query, cluster.ingester().ingester_grpc_connection())
.await
.unwrap();
assert_ne!(ingester_response.app_metadata.ingester_uuid, ingester_uuid);
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
assert_ne!(ingester_partition.app_metadata.ingester_uuid, ingester_uuid);
}
#[tokio::test]
async fn ingester_partition_pruning() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
// Set up cluster
let mut cluster = MiniCluster::create_shared_never_persist(database_url).await;
let mut steps: Vec<_> = vec![Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let namespace_name = state.cluster().namespace();
let mut namespace_client = influxdb_iox_client::namespace::Client::new(
state.cluster().router().router_grpc_connection(),
);
namespace_client
.create_namespace(
namespace_name,
None,
None,
Some(PartitionTemplate {
parts: vec![
TemplatePart {
part: Some(Part::TagValue("tag1".into())),
},
TemplatePart {
part: Some(Part::TagValue("tag3".into())),
},
],
}),
)
.await
.unwrap();
let mut table_client = influxdb_iox_client::table::Client::new(
state.cluster().router().router_grpc_connection(),
);
// table1: create implicitly by writing to it
// table2: do not override partition template => use namespace template
table_client
.create_table(namespace_name, "table2", None)
.await
.unwrap();
// table3: overide namespace template
table_client
.create_table(
namespace_name,
"table3",
Some(PartitionTemplate {
parts: vec![TemplatePart {
part: Some(Part::TagValue("tag2".into())),
}],
}),
)
.await
.unwrap();
}
.boxed()
}))]
.into_iter()
.chain((1..=3).flat_map(|tid| {
[Step::WriteLineProtocol(
[
format!("table{tid},tag1=v1a,tag2=v2a,tag3=v3a f=1 11"),
format!("table{tid},tag1=v1b,tag2=v2a,tag3=v3a f=1 11"),
format!("table{tid},tag1=v1a,tag2=v2b,tag3=v3a f=1 11"),
format!("table{tid},tag1=v1b,tag2=v2b,tag3=v3a f=1 11"),
format!("table{tid},tag1=v1a,tag2=v2a,tag3=v3b f=1 11"),
format!("table{tid},tag1=v1b,tag2=v2a,tag3=v3b f=1 11"),
format!("table{tid},tag1=v1a,tag2=v2b,tag3=v3b f=1 11"),
format!("table{tid},tag1=v1b,tag2=v2b,tag3=v3b f=1 11"),
]
.join("\n"),
)]
.into_iter()
}))
.collect();
steps.push(Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
// Note: The querier will perform correct type coercion. We must simulate this here, otherwise the ingester
// will NOT be able to prune the data because the predicate evaluation will fail with a type error
// and the predicate will be ignored.
let predicate = ::predicate::Predicate::new().with_expr(col("tag1").eq(lit(
ScalarValue::Dictionary(
Box::new(DataType::Int32),
Box::new(ScalarValue::from("v1a")),
),
)));
let query = IngesterQueryRequest::new(
state.cluster().namespace_id().await,
state.cluster().table_id("table1").await,
vec![],
Some(predicate),
);
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
let ingester_response = state
.cluster()
.query_ingester(
query.clone(),
state.cluster().ingester().ingester_grpc_connection(),
)
.await
.unwrap();
let expected = [
"+-----+------+------+------+--------------------------------+",
"| f | tag1 | tag2 | tag3 | time |",
"+-----+------+------+------+--------------------------------+",
"| 1.0 | v1a | v2a | v3a | 1970-01-01T00:00:00.000000011Z |",
"| 1.0 | v1a | v2a | v3b | 1970-01-01T00:00:00.000000011Z |",
"| 1.0 | v1a | v2b | v3a | 1970-01-01T00:00:00.000000011Z |",
"| 1.0 | v1a | v2b | v3b | 1970-01-01T00:00:00.000000011Z |",
"+-----+------+------+------+--------------------------------+",
];
let record_batches = ingester_response
.partitions
.into_iter()
.flat_map(|p| p.record_batches)
.collect::<Vec<_>>();
assert_batches_sorted_eq!(&expected, &record_batches);
}
.boxed()
})));
StepTest::new(&mut cluster, steps).run().await
}
#[tokio::test]

View File

@ -193,7 +193,14 @@ async fn write_replication() {
.await
.unwrap();
let ingester_uuid = ingester_response.app_metadata.ingester_uuid;
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
let ingester_uuid = ingester_partition.app_metadata.ingester_uuid;
assert!(!ingester_uuid.is_empty());
let expected = [
@ -212,7 +219,7 @@ async fn write_replication() {
"| A | B | 1970-01-01T00:00:00.000000020Z | 20 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches);
}
.boxed()
})));

View File

@ -32,6 +32,7 @@ once_cell = "1.18"
parking_lot = "0.12.1"
parquet_file = { version = "0.1.0", path = "../parquet_file" }
pin-project = "1.1.2"
predicate = { version = "0.1.0", path = "../predicate" }
prost = { version = "0.11.9", default-features = false, features = ["std"] }
rand = "0.8.5"
schema = { version = "0.1.0", path = "../schema" }

View File

@ -7,12 +7,13 @@ use std::sync::Arc;
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use metric::U64Counter;
use predicate::Predicate;
use trace::span::Span;
use super::{
partition::resolver::PartitionProvider,
post_write::PostWriteObserver,
table::{name_resolver::TableNameProvider, TableData},
table::{metadata_resolver::TableProvider, TableData},
};
use crate::{
arcmap::ArcMap,
@ -60,12 +61,13 @@ pub(crate) struct NamespaceData<O> {
/// A set of tables this [`NamespaceData`] instance has processed
/// [`IngestOp`]'s for.
///
/// The [`TableNameProvider`] acts as a [`DeferredLoad`] constructor to
/// resolve the [`TableName`] for new [`TableData`] out of the hot path.
/// The [`TableProvider`] acts as a [`DeferredLoad`] constructor to
/// resolve the catalog [`Table`] for new [`TableData`] out of the hot path.
///
/// [`TableName`]: crate::buffer_tree::table::TableName
///
/// [`Table`]: data_types::Table
tables: ArcMap<TableId, TableData<O>>,
table_name_resolver: Arc<dyn TableNameProvider>,
catalog_table_resolver: Arc<dyn TableProvider>,
/// The count of tables initialised in this Ingester so far, across all
/// namespaces.
table_count: U64Counter,
@ -83,7 +85,7 @@ impl<O> NamespaceData<O> {
pub(super) fn new(
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_name_resolver: Arc<dyn TableNameProvider>,
catalog_table_resolver: Arc<dyn TableProvider>,
partition_provider: Arc<dyn PartitionProvider>,
post_write_observer: Arc<O>,
metrics: &metric::Registry,
@ -99,7 +101,7 @@ impl<O> NamespaceData<O> {
namespace_id,
namespace_name,
tables: Default::default(),
table_name_resolver,
catalog_table_resolver,
table_count,
partition_provider,
post_write_observer,
@ -151,7 +153,7 @@ where
self.table_count.inc(1);
Arc::new(TableData::new(
table_id,
Arc::new(self.table_name_resolver.for_table(table_id)),
Arc::new(self.catalog_table_resolver.for_table(table_id)),
self.namespace_id,
Arc::clone(&self.namespace_name),
Arc::clone(&self.partition_provider),
@ -189,6 +191,7 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
predicate: Option<Predicate>,
) -> Result<Self::Response, QueryError> {
assert_eq!(
self.namespace_id, namespace_id,
@ -204,7 +207,7 @@ where
// a tracing delegate to emit a child span.
Ok(QueryResponse::new(
QueryExecTracing::new(inner, "table")
.query_exec(namespace_id, table_id, columns, span)
.query_exec(namespace_id, table_id, columns, span, predicate)
.await?,
))
}
@ -226,7 +229,7 @@ mod tests {
test_util::{
defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID,
ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER,
ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER,
},
};
@ -243,7 +246,7 @@ mod tests {
let ns = NamespaceData::new(
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_ms(),
Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER),
Arc::clone(&*ARBITRARY_TABLE_PROVIDER),
partition_provider,
Arc::new(MockPostWriteObserver::default()),
&metrics,

View File

@ -14,7 +14,7 @@ use self::{
buffer::{traits::Queryable, BufferState, DataBuffer, Persisting},
persisting::{BatchIdent, PersistingData},
};
use super::{namespace::NamespaceName, table::TableName};
use super::{namespace::NamespaceName, table::TableMetadata};
use crate::{deferred_load::DeferredLoad, query_adaptor::QueryAdaptor};
mod buffer;
@ -73,9 +73,9 @@ pub struct PartitionData {
/// The catalog ID for the table this partition is part of.
table_id: TableId,
/// The name of the table this partition is part of, potentially unresolved
/// The catalog metadata for the table this partition is part of, potentially unresolved
/// / deferred.
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
/// A [`DataBuffer`] for incoming writes.
buffer: DataBuffer,
@ -108,7 +108,7 @@ impl PartitionData {
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
sort_key: SortKeyState,
) -> Self {
Self {
@ -119,7 +119,7 @@ impl PartitionData {
namespace_id,
namespace_name,
table_id,
table_name,
table,
buffer: DataBuffer::default(),
persisting: VecDeque::with_capacity(1),
started_persistence_count: BatchIdent::default(),
@ -139,7 +139,7 @@ impl PartitionData {
trace!(
namespace_id = %self.namespace_id,
table_id = %self.table_id,
table_name = %self.table_name,
table = %self.table,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
"buffered write"
@ -175,7 +175,7 @@ impl PartitionData {
trace!(
namespace_id = %self.namespace_id,
table_id = %self.table_id,
table_name = %self.table_name,
table = %self.table,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
n_batches = data.len(),
@ -221,7 +221,7 @@ impl PartitionData {
debug!(
namespace_id = %self.namespace_id,
table_id = %self.table_id,
table_name = %self.table_name,
table = %self.table,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
%batch_ident,
@ -271,7 +271,7 @@ impl PartitionData {
persistence_count = %self.completed_persistence_count,
namespace_id = %self.namespace_id,
table_id = %self.table_id,
table_name = %self.table_name,
table = %self.table,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
batch_ident = %batch.batch_ident(),
@ -302,10 +302,10 @@ impl PartitionData {
self.completed_persistence_count
}
/// Return the name of the table this [`PartitionData`] is buffering writes
/// Return the metadata of the table this [`PartitionData`] is buffering writes
/// for.
pub(crate) fn table_name(&self) -> &Arc<DeferredLoad<TableName>> {
&self.table_name
pub(crate) fn table(&self) -> &Arc<DeferredLoad<TableMetadata>> {
&self.table
}
/// Return the table ID for this partition.

View File

@ -14,7 +14,7 @@ use crate::{
buffer_tree::{
namespace::NamespaceName,
partition::{resolver::SortKeyResolver, PartitionData, SortKeyState},
table::TableName,
table::TableMetadata,
},
deferred_load::DeferredLoad,
};
@ -173,7 +173,7 @@ where
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
) -> Arc<Mutex<PartitionData>> {
// Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache.
@ -203,7 +203,7 @@ where
namespace_id,
namespace_name,
table_id,
table_name,
table,
SortKeyState::Deferred(Arc::new(sort_key_resolver)),
)));
}
@ -212,13 +212,7 @@ where
// Otherwise delegate to the catalog / inner impl.
self.inner
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table_name,
)
.get_partition(partition_key, namespace_id, namespace_name, table_id, table)
.await
}
}
@ -234,7 +228,7 @@ mod tests {
use crate::{
buffer_tree::partition::resolver::mock::MockPartitionProvider,
test_util::{
defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder,
defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder,
ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_ID,
ARBITRARY_PARTITION_KEY, ARBITRARY_PARTITION_KEY_STR, ARBITRARY_TABLE_ID,
ARBITRARY_TABLE_NAME,
@ -270,15 +264,15 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_name_1_sec(),
defer_table_metadata_1_sec(),
)
.await;
assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID);
assert_eq!(got.lock().table_id(), ARBITRARY_TABLE_ID);
assert_eq!(
&**got.lock().table_name().get().await,
&***ARBITRARY_TABLE_NAME
&**got.lock().table().get().await.name(),
&**ARBITRARY_TABLE_NAME
);
assert_eq!(
&**got.lock().namespace_name().get().await,
@ -309,15 +303,15 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_name_1_sec(),
defer_table_metadata_1_sec(),
)
.await;
assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID);
assert_eq!(got.lock().table_id(), ARBITRARY_TABLE_ID);
assert_eq!(
&**got.lock().table_name().get().await,
&***ARBITRARY_TABLE_NAME
&**got.lock().table().get().await.name(),
&**ARBITRARY_TABLE_NAME
);
assert_eq!(
&**got.lock().namespace_name().get().await,
@ -366,15 +360,15 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_name_1_sec(),
defer_table_metadata_1_sec(),
)
.await;
assert_eq!(got.lock().partition_id(), other_key_id);
assert_eq!(got.lock().table_id(), ARBITRARY_TABLE_ID);
assert_eq!(
&**got.lock().table_name().get().await,
&***ARBITRARY_TABLE_NAME
&**got.lock().table().get().await.name(),
&**ARBITRARY_TABLE_NAME
);
}
@ -402,15 +396,15 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_sec(),
other_table,
defer_table_name_1_sec(),
defer_table_metadata_1_sec(),
)
.await;
assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID);
assert_eq!(got.lock().table_id(), other_table);
assert_eq!(
&**got.lock().table_name().get().await,
&***ARBITRARY_TABLE_NAME
&**got.lock().table().get().await.name(),
&**ARBITRARY_TABLE_NAME
);
}
}

View File

@ -15,7 +15,7 @@ use crate::{
buffer_tree::{
namespace::NamespaceName,
partition::{PartitionData, SortKeyState},
table::TableName,
table::TableMetadata,
},
deferred_load::DeferredLoad,
};
@ -61,12 +61,12 @@ impl PartitionProvider for CatalogPartitionResolver {
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
) -> Arc<Mutex<PartitionData>> {
debug!(
%partition_key,
%table_id,
%table_name,
%table,
"upserting partition in catalog"
);
let p = Backoff::new(&self.backoff_config)
@ -86,7 +86,7 @@ impl PartitionProvider for CatalogPartitionResolver {
namespace_id,
namespace_name,
table_id,
table_name,
table,
SortKeyState::Provided(p.sort_key()),
)))
}
@ -103,6 +103,7 @@ mod tests {
use iox_catalog::test_helpers::{arbitrary_namespace, arbitrary_table};
use super::*;
use crate::buffer_tree::table::TableName;
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "ns-bananas";
@ -138,17 +139,25 @@ mod tests {
table_id,
Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { TableName::from(TABLE_NAME) },
async {
TableMetadata::new_for_testing(
TableName::from(TABLE_NAME),
Default::default(),
)
},
&metrics,
)),
)
.await;
// Ensure the table name is available.
let _ = got.lock().table_name().get().await;
let _ = got.lock().table().get().await.name();
assert_eq!(got.lock().namespace_id(), namespace_id);
assert_eq!(got.lock().table_name().to_string(), table_name.to_string());
assert_eq!(
got.lock().table().get().await.name().to_string(),
table_name.to_string()
);
assert_matches!(got.lock().sort_key(), SortKeyState::Provided(None));
assert!(got.lock().partition_key.ptr_eq(&callers_partition_key));

View File

@ -14,7 +14,7 @@ use hashbrown::{hash_map::Entry, HashMap};
use parking_lot::Mutex;
use crate::{
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName},
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableMetadata},
deferred_load::DeferredLoad,
};
@ -146,7 +146,7 @@ where
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
) -> Arc<Mutex<PartitionData>> {
let key = Key {
namespace_id,
@ -170,7 +170,7 @@ where
namespace_id,
namespace_name,
table_id,
table_name,
table,
));
// Make the future poll-able by many callers, all of which
@ -233,7 +233,7 @@ async fn do_fetch<T>(
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
) -> Arc<Mutex<PartitionData>>
where
T: PartitionProvider + 'static,
@ -248,13 +248,7 @@ where
// (which would cause the connection to be returned).
tokio::spawn(async move {
inner
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table_name,
)
.get_partition(partition_key, namespace_id, namespace_name, table_id, table)
.await
})
.await
@ -280,7 +274,7 @@ mod tests {
use crate::{
buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState},
test_util::{
defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder,
defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder,
ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
},
};
@ -308,7 +302,7 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_name_1_sec(),
defer_table_metadata_1_sec(),
)
})
.collect::<FuturesUnordered<_>>()
@ -342,7 +336,7 @@ mod tests {
_namespace_id: NamespaceId,
_namespace_name: Arc<DeferredLoad<NamespaceName>>,
_table_id: TableId,
_table_name: Arc<DeferredLoad<TableName>>,
_table: Arc<DeferredLoad<TableMetadata>>,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Arc<Mutex<PartitionData>>>
@ -368,7 +362,7 @@ mod tests {
let data = PartitionDataBuilder::new().build();
let namespace_loader = defer_namespace_name_1_sec();
let table_name_loader = defer_table_name_1_sec();
let table_loader = defer_table_metadata_1_sec();
// Add a single instance of the partition - if more than one call is
// made to the mock, it will panic.
@ -384,14 +378,14 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
Arc::clone(&namespace_loader),
ARBITRARY_TABLE_ID,
Arc::clone(&table_name_loader),
Arc::clone(&table_loader),
);
let pa_2 = layer.get_partition(
ARBITRARY_PARTITION_KEY.clone(),
ARBITRARY_NAMESPACE_ID,
Arc::clone(&namespace_loader),
ARBITRARY_TABLE_ID,
Arc::clone(&table_name_loader),
Arc::clone(&table_loader),
);
let waker = futures::task::noop_waker();
@ -411,7 +405,7 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
namespace_loader,
ARBITRARY_TABLE_ID,
table_name_loader,
table_loader,
)
.with_timeout_panic(Duration::from_secs(5))
.await;
@ -441,7 +435,7 @@ mod tests {
_namespace_id: NamespaceId,
_namespace_name: Arc<DeferredLoad<NamespaceName>>,
_table_id: TableId,
_table_name: Arc<DeferredLoad<TableName>>,
_table: Arc<DeferredLoad<TableMetadata>>,
) -> Arc<Mutex<PartitionData>> {
let waker = self.wait.notified();
let permit = self.sem.acquire().await.unwrap();
@ -481,7 +475,7 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_name_1_sec(),
defer_table_metadata_1_sec(),
);
let waker = futures::task::noop_waker();

View File

@ -8,7 +8,7 @@ use parking_lot::Mutex;
use super::r#trait::PartitionProvider;
use crate::{
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName},
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableMetadata},
deferred_load::{self, DeferredLoad},
};
@ -53,7 +53,7 @@ impl PartitionProvider for MockPartitionProvider {
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
) -> Arc<Mutex<PartitionData>> {
let p = self
.partitions
@ -75,8 +75,8 @@ impl PartitionProvider for MockPartitionProvider {
deferred_load::UNRESOLVED_DISPLAY_STRING,
);
let actual_table_name = p.table_name().to_string();
let expected_table_name = table_name.get().await.to_string();
let actual_table_name = p.table().to_string();
let expected_table_name = table.get().await.name().to_string();
assert!(
(actual_table_name.as_str() == expected_table_name)
|| (actual_table_name == deferred_load::UNRESOLVED_DISPLAY_STRING),

View File

@ -5,7 +5,7 @@ use data_types::{NamespaceId, PartitionKey, TableId};
use parking_lot::Mutex;
use crate::{
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableName},
buffer_tree::{namespace::NamespaceName, partition::PartitionData, table::TableMetadata},
deferred_load::DeferredLoad,
};
@ -24,7 +24,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug {
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
) -> Arc<Mutex<PartitionData>>;
}
@ -39,16 +39,10 @@ where
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
) -> Arc<Mutex<PartitionData>> {
(**self)
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table_name,
)
.get_partition(partition_key, namespace_id, namespace_name, table_id, table)
.await
}
}
@ -61,7 +55,7 @@ mod tests {
use crate::{
buffer_tree::partition::{resolver::mock::MockPartitionProvider, SortKeyState},
test_util::{
defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder,
defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder,
ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY,
ARBITRARY_TABLE_ID,
},
@ -70,10 +64,10 @@ mod tests {
#[tokio::test]
async fn test_arc_impl() {
let namespace_loader = defer_namespace_name_1_sec();
let table_name_loader = defer_table_name_1_sec();
let table_loader = defer_table_metadata_1_sec();
let data = PartitionDataBuilder::new()
.with_table_name_loader(Arc::clone(&table_name_loader))
.with_table_loader(Arc::clone(&table_loader))
.with_namespace_loader(Arc::clone(&namespace_loader))
.build();
@ -85,7 +79,7 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
Arc::clone(&namespace_loader),
ARBITRARY_TABLE_ID,
Arc::clone(&table_name_loader),
Arc::clone(&table_loader),
)
.await;
assert_eq!(got.lock().partition_id(), ARBITRARY_PARTITION_ID);
@ -94,9 +88,6 @@ mod tests {
got.lock().namespace_name().to_string(),
namespace_loader.to_string()
);
assert_eq!(
got.lock().table_name().to_string(),
table_name_loader.to_string()
);
assert_eq!(got.lock().table().to_string(), table_loader.to_string());
}
}

View File

@ -4,13 +4,14 @@ use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use metric::U64Counter;
use parking_lot::Mutex;
use predicate::Predicate;
use trace::span::Span;
use super::{
namespace::{name_resolver::NamespaceNameProvider, NamespaceData},
partition::{resolver::PartitionProvider, PartitionData},
post_write::PostWriteObserver,
table::name_resolver::TableNameProvider,
table::metadata_resolver::TableProvider,
};
use crate::{
arcmap::ArcMap,
@ -92,12 +93,12 @@ pub(crate) struct BufferTree<O> {
/// [`NamespaceName`]: data_types::NamespaceName
namespaces: ArcMap<NamespaceId, NamespaceData<O>>,
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
/// The [`TableName`] provider used by [`NamespaceData`] to initialise a
/// The [`TableMetadata`] provider used by [`NamespaceData`] to initialise a
/// [`TableData`].
///
/// [`TableName`]: crate::buffer_tree::table::TableName
/// [`TableMetadata`]: crate::buffer_tree::table::TableMetadata
/// [`TableData`]: crate::buffer_tree::table::TableData
table_name_resolver: Arc<dyn TableNameProvider>,
table_resolver: Arc<dyn TableProvider>,
metrics: Arc<metric::Registry>,
namespace_count: U64Counter,
@ -112,7 +113,7 @@ where
/// Initialise a new [`BufferTree`] that emits metrics to `metrics`.
pub(crate) fn new(
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
table_name_resolver: Arc<dyn TableNameProvider>,
table_resolver: Arc<dyn TableProvider>,
partition_provider: Arc<dyn PartitionProvider>,
post_write_observer: Arc<O>,
metrics: Arc<metric::Registry>,
@ -127,7 +128,7 @@ where
Self {
namespaces: Default::default(),
namespace_name_resolver,
table_name_resolver,
table_resolver,
metrics,
partition_provider,
post_write_observer,
@ -178,7 +179,7 @@ where
Arc::new(NamespaceData::new(
namespace_id,
Arc::new(self.namespace_name_resolver.for_namespace(namespace_id)),
Arc::clone(&self.table_name_resolver),
Arc::clone(&self.table_resolver),
Arc::clone(&self.partition_provider),
Arc::clone(&self.post_write_observer),
&self.metrics,
@ -202,6 +203,7 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
predicate: Option<Predicate>,
) -> Result<Self::Response, QueryError> {
// Extract the namespace if it exists.
let inner = self
@ -211,7 +213,7 @@ where
// Delegate query execution to the namespace, wrapping the execution in
// a tracing delegate to emit a child span.
QueryExecTracing::new(inner, "namespace")
.query_exec(namespace_id, table_id, columns, span)
.query_exec(namespace_id, table_id, columns, span, predicate)
.await
}
}
@ -227,29 +229,41 @@ where
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use arrow::datatypes::DataType;
use assert_matches::assert_matches;
use data_types::{
partition_template::{test_table_partition_override, TemplatePart},
PartitionId, PartitionKey,
};
use datafusion::{
assert_batches_eq, assert_batches_sorted_eq,
prelude::{col, lit},
scalar::ScalarValue,
};
use futures::StreamExt;
use lazy_static::lazy_static;
use metric::{Attributes, Metric};
use predicate::Predicate;
use test_helpers::maybe_start_logging;
use super::*;
use crate::{
buffer_tree::{
namespace::{name_resolver::mock::MockNamespaceNameProvider, NamespaceData},
partition::resolver::mock::MockPartitionProvider,
post_write::mock::MockPostWriteObserver,
table::TableName,
table::{metadata_resolver::mock::MockTableProvider, TableMetadata},
},
deferred_load::{self, DeferredLoad},
query::partition_response::PartitionResponse,
test_util::{
defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID,
ARBITRARY_NAMESPACE_NAME, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY,
ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER,
ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER,
},
};
use assert_matches::assert_matches;
use data_types::{PartitionId, PartitionKey};
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use futures::StreamExt;
use lazy_static::lazy_static;
use metric::{Attributes, Metric};
use std::{sync::Arc, time::Duration};
const PARTITION2_ID: PartitionId = PartitionId::new(2);
const PARTITION3_ID: PartitionId = PartitionId::new(3);
@ -278,7 +292,7 @@ mod tests {
let ns = NamespaceData::new(
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_ms(),
Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER),
Arc::clone(&*ARBITRARY_TABLE_PROVIDER),
partition_provider,
Arc::new(MockPostWriteObserver::default()),
&metrics,
@ -337,13 +351,19 @@ mod tests {
macro_rules! test_write_query {
(
$name:ident,
partitions = [$($partition:expr), +], // The set of PartitionData for the mock partition provider
$(table_provider = $table_provider:expr,)? // An optional table provider
partitions = [$($partition:expr), +], // The set of PartitionData for the mock
// partition provider
writes = [$($write:expr), *], // The set of WriteOperation to apply()
want = $want:expr // The expected results of querying ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID
predicate = $predicate:expr, // An optional predicate to use for the query
want = $want:expr // The expected results of querying
// ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID
) => {
paste::paste! {
#[tokio::test]
async fn [<test_write_query_ $name>]() {
maybe_start_logging();
// Configure the mock partition provider with the provided
// partitions.
let partition_provider = Arc::new(MockPartitionProvider::default()
@ -352,10 +372,16 @@ mod tests {
)+
);
#[allow(unused_variables)]
let table_provider = Arc::clone(&*ARBITRARY_TABLE_PROVIDER);
$(
let table_provider: Arc<dyn TableProvider> = $table_provider;
)?
// Init the buffer tree
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)),
Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER),
table_provider,
partition_provider,
Arc::new(MockPostWriteObserver::default()),
Arc::new(metric::Registry::default()),
@ -370,7 +396,13 @@ mod tests {
// Execute the query against ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID
let batches = buf
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.query_exec(
ARBITRARY_NAMESPACE_ID,
ARBITRARY_TABLE_ID,
vec![],
None,
$predicate
)
.await
.expect("query should succeed")
.into_partition_stream()
@ -407,6 +439,7 @@ mod tests {
),
None,
)],
predicate = None,
want = [
"+----------+------+-------------------------------+",
"| region | temp | time |",
@ -456,6 +489,7 @@ mod tests {
None,
)
],
predicate = None,
want = [
"+----------+------+-------------------------------+",
"| region | temp | time |",
@ -508,6 +542,7 @@ mod tests {
None,
)
],
predicate = None,
want = [
"+--------+------+-------------------------------+",
"| region | temp | time |",
@ -520,7 +555,7 @@ mod tests {
// A query that ensures the data across multiple tables (with the same table
// name!) is correctly filtered to return only the queried table.
test_write_query!(
filter_multiple_tabls,
filter_multiple_tables,
partitions = [
PartitionDataBuilder::new()
.with_partition_id(ARBITRARY_PARTITION_ID)
@ -558,6 +593,7 @@ mod tests {
None,
)
],
predicate = None,
want = [
"+--------+------+-------------------------------+",
"| region | temp | time |",
@ -603,6 +639,7 @@ mod tests {
None,
)
],
predicate = None,
want = [
"+----------+------+-------------------------------+",
"| region | temp | time |",
@ -613,6 +650,98 @@ mod tests {
]
);
// This test asserts that the results returned from a query to the
// [`BufferTree`] filters rows from the result as directed by the
// query's [`Predicate`].
//
// It makes sure that for a [`BufferTree`] with a set of partitions split
// by some key a query with a predicate `<partition key column> == <arbitrary literal>`
// returns partition data that has been filtered to contain only rows which
// contain the specified value in that partition key column.
test_write_query!(
filter_by_predicate_partition_key,
table_provider = Arc::new(MockTableProvider::new(TableMetadata::new_for_testing(
ARBITRARY_TABLE_NAME.clone(),
test_table_partition_override(vec![TemplatePart::TagValue("region")])
))),
partitions = [
PartitionDataBuilder::new()
.with_partition_id(ARBITRARY_PARTITION_ID)
.with_partition_key(ARBITRARY_PARTITION_KEY.clone()) // "platanos"
.build(),
PartitionDataBuilder::new()
.with_partition_id(PARTITION2_ID)
.with_partition_key(PARTITION2_KEY.clone()) // "p2"
.build()
],
writes = [
make_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region={} temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME, &*ARBITRARY_PARTITION_KEY
),
None,
),
make_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
1,
&format!(
r#"{},region={} temp=12 4242424242"#,
&*ARBITRARY_TABLE_NAME, &*ARBITRARY_PARTITION_KEY
),
None,
),
make_write_op(
&PARTITION2_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
2,
&format!(
r#"{},region={} temp=17 7676767676"#,
&*ARBITRARY_TABLE_NAME, *PARTITION2_KEY
),
None,
),
make_write_op(
&PARTITION2_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
3,
&format!(
r#"{},region={} temp=13 7676767676"#,
&*ARBITRARY_TABLE_NAME, *PARTITION2_KEY,
),
None,
)
],
// NOTE: The querier will coerce the type of the predicates correctly, so the ingester does NOT need to perform
// type coercion. This type should reflect that.
predicate = Some(Predicate::new().with_expr(col("region").eq(lit(
ScalarValue::Dictionary(
Box::new(DataType::Int32),
Box::new(ScalarValue::from(PARTITION2_KEY.inner()))
)
)))),
want = [
"+--------+------+-------------------------------+",
"| region | temp | time |",
"+--------+------+-------------------------------+",
"| p2 | 13.0 | 1970-01-01T00:00:07.676767676 |",
"| p2 | 17.0 | 1970-01-01T00:00:07.676767676 |",
"+--------+------+-------------------------------+",
]
);
/// Assert that multiple writes to a single namespace/table results in a
/// single namespace being created, and matching metrics.
#[tokio::test]
@ -627,7 +756,7 @@ mod tests {
)
.with_partition(
PartitionDataBuilder::new()
.with_partition_id(ARBITRARY_PARTITION_ID)
.with_partition_id(PARTITION2_ID)
.with_partition_key(PARTITION2_KEY.clone())
.build(),
),
@ -638,7 +767,7 @@ mod tests {
// Init the buffer tree
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)),
Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER),
Arc::clone(&*ARBITRARY_TABLE_PROVIDER),
partition_provider,
Arc::new(MockPostWriteObserver::default()),
Arc::clone(&metrics),
@ -722,9 +851,14 @@ mod tests {
.with_partition_id(PARTITION3_ID)
.with_partition_key(PARTITION3_KEY.clone())
.with_table_id(TABLE2_ID)
.with_table_name_loader(Arc::new(DeferredLoad::new(
.with_table_loader(Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async move { TableName::from(TABLE2_NAME) },
async move {
TableMetadata::new_for_testing(
TABLE2_NAME.into(),
Default::default(),
)
},
&metric::Registry::default(),
)))
.build(),
@ -734,7 +868,7 @@ mod tests {
// Init the buffer tree
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)),
Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER),
Arc::clone(&*ARBITRARY_TABLE_PROVIDER),
partition_provider,
Arc::new(MockPostWriteObserver::default()),
Arc::clone(&Arc::new(metric::Registry::default())),
@ -821,7 +955,7 @@ mod tests {
// Init the BufferTree
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)),
Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER),
Arc::clone(&*ARBITRARY_TABLE_PROVIDER),
partition_provider,
Arc::new(MockPostWriteObserver::default()),
Arc::new(metric::Registry::default()),
@ -829,7 +963,13 @@ mod tests {
// Query the empty tree
let err = buf
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.query_exec(
ARBITRARY_NAMESPACE_ID,
ARBITRARY_TABLE_ID,
vec![],
None,
None,
)
.await
.expect_err("query should fail");
assert_matches!(err, QueryError::NamespaceNotFound(ns) => {
@ -854,7 +994,7 @@ mod tests {
// Ensure an unknown table errors
let err = buf
.query_exec(ARBITRARY_NAMESPACE_ID, TABLE2_ID, vec![], None)
.query_exec(ARBITRARY_NAMESPACE_ID, TABLE2_ID, vec![], None, None)
.await
.expect_err("query should fail");
assert_matches!(err, QueryError::TableNotFound(ns, t) => {
@ -863,9 +1003,15 @@ mod tests {
});
// Ensure a valid namespace / table does not error
buf.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.await
.expect("namespace / table should exist");
buf.query_exec(
ARBITRARY_NAMESPACE_ID,
ARBITRARY_TABLE_ID,
vec![],
None,
None,
)
.await
.expect("namespace / table should exist");
}
/// This test asserts the read consistency properties defined in the
@ -906,7 +1052,7 @@ mod tests {
// Init the buffer tree
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)),
Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER),
Arc::clone(&*ARBITRARY_TABLE_PROVIDER),
partition_provider,
Arc::new(MockPostWriteObserver::default()),
Arc::new(metric::Registry::default()),
@ -931,7 +1077,13 @@ mod tests {
// Execute a query of the buffer tree, generating the result stream, but
// DO NOT consume it.
let stream = buf
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.query_exec(
ARBITRARY_NAMESPACE_ID,
ARBITRARY_TABLE_ID,
vec![],
None,
None,
)
.await
.expect("query should succeed")
.into_partition_stream();

View File

@ -1,13 +1,23 @@
//! Table level data buffer structures.
pub(crate) mod name_resolver;
pub(crate) mod metadata_resolver;
use std::{fmt::Debug, sync::Arc};
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
use data_types::{
partition_template::{build_column_values, ColumnValue, TablePartitionTemplateOverride},
NamespaceId, PartitionKey, SequenceNumber, Table, TableId,
};
use datafusion::scalar::ScalarValue;
use iox_query::{
chunk_statistics::{create_chunk_statistics, ColumnRange},
pruning::prune_summaries,
QueryChunk,
};
use mutable_batch::MutableBatch;
use parking_lot::Mutex;
use predicate::Predicate;
use schema::Projection;
use trace::span::{Span, SpanRecorder};
@ -22,8 +32,52 @@ use crate::{
query::{
partition_response::PartitionResponse, response::PartitionStream, QueryError, QueryExec,
},
query_adaptor::QueryAdaptor,
};
/// Metadata from the catalog for a table
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct TableMetadata {
name: TableName,
partition_template: TablePartitionTemplateOverride,
}
impl TableMetadata {
#[cfg(test)]
pub fn new_for_testing(
name: TableName,
partition_template: TablePartitionTemplateOverride,
) -> Self {
Self {
name,
partition_template,
}
}
pub(crate) fn name(&self) -> &TableName {
&self.name
}
pub(crate) fn partition_template(&self) -> &TablePartitionTemplateOverride {
&self.partition_template
}
}
impl From<Table> for TableMetadata {
fn from(t: Table) -> Self {
Self {
name: t.name.into(),
partition_template: t.partition_template,
}
}
}
impl std::fmt::Display for TableMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.name, f)
}
}
/// The string name / identifier of a Table.
///
/// A reference-counted, cheap clone-able string.
@ -69,7 +123,7 @@ impl PartialEq<str> for TableName {
#[derive(Debug)]
pub(crate) struct TableData<O> {
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
catalog_table: Arc<DeferredLoad<TableMetadata>>,
/// The catalog ID of the namespace this table is being populated from.
namespace_id: NamespaceId,
@ -93,7 +147,7 @@ impl<O> TableData<O> {
/// for the first time.
pub(super) fn new(
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
catalog_table: Arc<DeferredLoad<TableMetadata>>,
namespace_id: NamespaceId,
namespace_name: Arc<DeferredLoad<NamespaceName>>,
partition_provider: Arc<dyn PartitionProvider>,
@ -101,7 +155,7 @@ impl<O> TableData<O> {
) -> Self {
Self {
table_id,
table_name,
catalog_table,
namespace_id,
namespace_name,
partition_data: Default::default(),
@ -132,9 +186,9 @@ impl<O> TableData<O> {
self.table_id
}
/// Returns the name of this table.
pub(crate) fn table_name(&self) -> &Arc<DeferredLoad<TableName>> {
&self.table_name
/// Returns the catalog data for this table.
pub(crate) fn catalog_table(&self) -> &Arc<DeferredLoad<TableMetadata>> {
&self.catalog_table
}
/// Return the [`NamespaceId`] this table is a part of.
@ -166,7 +220,7 @@ where
self.namespace_id,
Arc::clone(&self.namespace_name),
self.table_id,
Arc::clone(&self.table_name),
Arc::clone(&self.catalog_table),
)
.await;
// Add the partition to the map.
@ -204,6 +258,7 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
predicate: Option<Predicate>,
) -> Result<Self::Response, QueryError> {
assert_eq!(self.table_id, table_id, "buffer tree index inconsistency");
assert_eq!(
@ -211,18 +266,21 @@ where
"buffer tree index inconsistency"
);
let table_partition_template = self.catalog_table.get().await.partition_template;
// Gather the partition data from all of the partitions in this table.
let span = SpanRecorder::new(span);
let partitions = self.partitions().into_iter().map(move |p| {
let mut span = span.child("partition read");
let (id, hash_id, completed_persistence_count, data) = {
let (id, hash_id, completed_persistence_count, data, partition_key) = {
let mut p = p.lock();
(
p.partition_id(),
p.partition_hash_id().cloned(),
p.completed_persistence_count(),
p.get_query_data(),
p.partition_key().clone(),
)
};
@ -230,6 +288,31 @@ where
Some(data) => {
assert_eq!(id, data.partition_id());
let data = Arc::new(data);
// Potentially prune out this partition if the partition
// template & derived partition key can be used to match
// against the optional predicate.
if predicate
.as_ref()
.map(|p| {
!keep_after_pruning_partition_key(
&table_partition_template,
&partition_key,
p,
&data,
)
})
.unwrap_or_default()
{
return PartitionResponse::new(
vec![],
id,
hash_id,
completed_persistence_count,
);
}
// Project the data if necessary
let columns = columns.iter().map(String::as_str).collect::<Vec<_>>();
let selection = if columns.is_empty() {
@ -252,6 +335,106 @@ where
}
}
/// Return true if `data` contains one or more rows matching `predicate`,
/// pruning based on the `partition_key` and `template`.
///
/// Returns false iff it can be proven that all of data does not match the
/// predicate.
fn keep_after_pruning_partition_key(
table_partition_template: &TablePartitionTemplateOverride,
partition_key: &PartitionKey,
predicate: &Predicate,
data: &QueryAdaptor,
) -> bool {
// Construct a set of per-column min/max statistics based on the partition
// key values.
let column_ranges = Arc::new(
build_column_values(table_partition_template, partition_key.inner())
.filter_map(|(col, val)| {
let range = match val {
ColumnValue::Identity(s) => {
let s = Arc::new(ScalarValue::from(s.as_ref()));
ColumnRange {
min_value: Arc::clone(&s),
max_value: s,
}
}
ColumnValue::Prefix(p) if p.is_empty() => return None,
ColumnValue::Prefix(p) => {
// If the partition only has a prefix of the tag value
// (it was truncated) then form a conservative range:
//
// # Minimum
// Use the prefix itself.
//
// Note that the minimum is inclusive.
//
// All values in the partition are either:
//
// - identical to the prefix, in which case they are
// included by the inclusive minimum
//
// - have the form `"<prefix><s>"`, and it holds that
// `"<prefix><s>" > "<prefix>"` for all strings
// `"<s>"`.
//
// # Maximum
// Use `"<prefix_excluding_last_char><char::max>"`.
//
// Note that the maximum is inclusive.
//
// All strings in this partition must be smaller than
// this constructed maximum, because string comparison
// is front-to-back and the
// `"<prefix_excluding_last_char><char::max>" >
// "<prefix>"`.
let min_value = Arc::new(ScalarValue::from(p.as_ref()));
let mut chars = p.as_ref().chars().collect::<Vec<_>>();
*chars.last_mut().expect("checked that prefix is not empty") =
std::char::MAX;
let max_value = Arc::new(ScalarValue::from(
chars.into_iter().collect::<String>().as_str(),
));
ColumnRange {
min_value,
max_value,
}
}
};
Some((Arc::from(col), range))
})
.collect::<HashMap<_, _>>(),
);
let chunk_statistics = Arc::new(create_chunk_statistics(
data.num_rows(),
data.schema(),
data.ts_min_max(),
&column_ranges,
));
prune_summaries(
data.schema(),
&[(chunk_statistics, data.schema().as_arrow())],
predicate,
)
// Errors are logged by `iox_query` and sometimes fine, e.g. for not
// implemented DataFusion features or upstream bugs. The querier uses the
// same strategy. Pruning is a mere optimization and should not lead to
// crashes or unreadable data.
.ok()
.map(|vals| {
vals.into_iter()
.next()
.expect("one chunk in, one chunk out")
})
.unwrap_or(true)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@ -265,7 +448,7 @@ mod tests {
post_write::mock::MockPostWriteObserver,
},
test_util::{
defer_namespace_name_1_sec, defer_table_name_1_sec, PartitionDataBuilder,
defer_namespace_name_1_sec, defer_table_metadata_1_sec, PartitionDataBuilder,
ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
ARBITRARY_TABLE_NAME,
},
@ -280,7 +463,7 @@ mod tests {
let table = TableData::new(
ARBITRARY_TABLE_ID,
defer_table_name_1_sec(),
defer_table_metadata_1_sec(),
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_sec(),
partition_provider,

View File

@ -4,24 +4,24 @@ use backoff::{Backoff, BackoffConfig};
use data_types::TableId;
use iox_catalog::interface::Catalog;
use super::TableName;
use super::TableMetadata;
use crate::deferred_load::DeferredLoad;
/// An abstract provider of a [`DeferredLoad`] configured to fetch the
/// [`TableName`] of the specified [`TableId`].
pub(crate) trait TableNameProvider: Send + Sync + std::fmt::Debug {
fn for_table(&self, id: TableId) -> DeferredLoad<TableName>;
/// catalog [`TableMetadata`] of the specified [`TableId`].
pub(crate) trait TableProvider: Send + Sync + std::fmt::Debug {
fn for_table(&self, id: TableId) -> DeferredLoad<TableMetadata>;
}
#[derive(Debug)]
pub(crate) struct TableNameResolver {
pub(crate) struct TableResolver {
max_smear: Duration,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
metrics: Arc<metric::Registry>,
}
impl TableNameResolver {
impl TableResolver {
pub(crate) fn new(
max_smear: Duration,
catalog: Arc<dyn Catalog>,
@ -36,16 +36,16 @@ impl TableNameResolver {
}
}
/// Fetch the [`TableName`] from the [`Catalog`] for specified
/// Fetch the [`TableMetadata`] from the [`Catalog`] for specified
/// `table_id`, retrying endlessly when errors occur.
pub(crate) async fn fetch(
table_id: TableId,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
) -> TableName {
) -> TableMetadata {
Backoff::new(&backoff_config)
.retry_all_errors("fetch table name", || async {
let s = catalog
.retry_all_errors("fetch table", || async {
let table = catalog
.repositories()
.await
.tables()
@ -54,18 +54,17 @@ impl TableNameResolver {
.unwrap_or_else(|| {
panic!("resolving table name for non-existent table id {table_id}")
})
.name
.into();
Result::<_, iox_catalog::interface::Error>::Ok(s)
Result::<_, iox_catalog::interface::Error>::Ok(table)
})
.await
.expect("retry forever")
}
}
impl TableNameProvider for TableNameResolver {
fn for_table(&self, id: TableId) -> DeferredLoad<TableName> {
impl TableProvider for TableResolver {
fn for_table(&self, id: TableId) -> DeferredLoad<TableMetadata> {
DeferredLoad::new(
self.max_smear,
Self::fetch(id, Arc::clone(&self.catalog), self.backoff_config.clone()),
@ -79,28 +78,33 @@ pub(crate) mod mock {
use super::*;
#[derive(Debug)]
pub(crate) struct MockTableNameProvider {
name: TableName,
pub(crate) struct MockTableProvider {
table: TableMetadata,
}
impl MockTableNameProvider {
pub(crate) fn new(name: impl Into<TableName>) -> Self {
Self { name: name.into() }
impl MockTableProvider {
pub(crate) fn new(table: impl Into<TableMetadata>) -> Self {
Self {
table: table.into(),
}
}
}
impl Default for MockTableNameProvider {
impl Default for MockTableProvider {
fn default() -> Self {
Self::new("bananas")
Self::new(TableMetadata::new_for_testing(
"bananas".into(),
Default::default(),
))
}
}
impl TableNameProvider for MockTableNameProvider {
fn for_table(&self, _id: TableId) -> DeferredLoad<TableName> {
let name = self.name.clone();
impl TableProvider for MockTableProvider {
fn for_table(&self, _id: TableId) -> DeferredLoad<TableMetadata> {
let table = self.table.clone();
DeferredLoad::new(
Duration::from_secs(1),
async { name },
async { table },
&metric::Registry::default(),
)
}
@ -129,7 +133,7 @@ mod tests {
// Populate the catalog with the namespace / table
let (_ns_id, table_id) = populate_catalog(&*catalog, NAMESPACE_NAME, TABLE_NAME).await;
let fetcher = Arc::new(TableNameResolver::new(
let fetcher = Arc::new(TableResolver::new(
Duration::from_secs(10),
Arc::clone(&catalog),
backoff_config.clone(),
@ -141,6 +145,6 @@ mod tests {
.get()
.with_timeout_panic(Duration::from_secs(5))
.await;
assert_eq!(&**got, TABLE_NAME);
assert_eq!(got.name(), TABLE_NAME);
}
}

View File

@ -29,7 +29,7 @@ use crate::{
partition::resolver::{
CatalogPartitionResolver, CoalescePartitionResolver, PartitionCache, PartitionProvider,
},
table::name_resolver::{TableNameProvider, TableNameResolver},
table::metadata_resolver::{TableProvider, TableResolver},
BufferTree,
},
dml_sink::{instrumentation::DmlSinkInstrumentation, tracing::DmlSinkTracing},
@ -246,8 +246,8 @@ where
Arc::clone(&metrics),
));
// Initialise the deferred table name resolver.
let table_name_provider: Arc<dyn TableNameProvider> = Arc::new(TableNameResolver::new(
// Initialise the deferred table metadata resolver.
let table_provider: Arc<dyn TableProvider> = Arc::new(TableResolver::new(
persist_background_fetch_time,
Arc::clone(&catalog),
BackoffConfig::default(),
@ -319,7 +319,7 @@ where
let buffer = Arc::new(BufferTree::new(
namespace_name_provider,
table_name_provider,
table_provider,
partition_provider,
Arc::new(hot_partition_persister),
Arc::clone(&metrics),

View File

@ -18,7 +18,7 @@ use crate::{
buffer_tree::{
namespace::NamespaceName,
partition::{persisting::PersistingData, PartitionData, SortKeyState},
table::TableName,
table::TableMetadata,
},
deferred_load::DeferredLoad,
persist::completion_observer::CompletedPersist,
@ -94,14 +94,14 @@ pub(super) struct Context {
// The partition key for this partition
partition_key: PartitionKey,
/// Deferred strings needed for persistence.
/// Deferred data needed for persistence.
///
/// These [`DeferredLoad`] are given a pre-fetch hint when this [`Context`]
/// is constructed to load them in the background (if not already resolved)
/// in order to avoid incurring the query latency when the values are
/// needed.
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_name: Arc<DeferredLoad<TableName>>,
table: Arc<DeferredLoad<TableMetadata>>,
/// The [`SortKey`] for the [`PartitionData`] at the time of [`Context`]
/// construction.
@ -164,7 +164,7 @@ impl Context {
partition_hash_id: guard.partition_hash_id().cloned(),
partition_key: guard.partition_key().clone(),
namespace_name: Arc::clone(guard.namespace_name()),
table_name: Arc::clone(guard.table_name()),
table: Arc::clone(guard.table()),
// Technically the sort key isn't immutable, but MUST NOT be
// changed by an external actor (by something other than code in
@ -182,7 +182,7 @@ impl Context {
// Pre-fetch the deferred values in a background thread (if not already
// resolved)
s.namespace_name.prefetch_now();
s.table_name.prefetch_now();
s.table.prefetch_now();
if let SortKeyState::Deferred(ref d) = s.sort_key {
d.prefetch_now();
}
@ -253,7 +253,7 @@ impl Context {
namespace_id = %self.namespace_id,
namespace_name = %self.namespace_name,
table_id = %self.table_id,
table_name = %self.table_name,
table = %self.table,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
total_persist_duration = ?now.duration_since(self.enqueued_at),
@ -315,7 +315,7 @@ impl Context {
self.namespace_name.as_ref()
}
pub(super) fn table_name(&self) -> &DeferredLoad<TableName> {
self.table_name.as_ref()
pub(super) fn table(&self) -> &DeferredLoad<TableMetadata> {
self.table.as_ref()
}
}

View File

@ -501,7 +501,7 @@ mod tests {
test_util::{
make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, ARBITRARY_NAMESPACE_NAME,
ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_NAME_PROVIDER,
ARBITRARY_TABLE_NAME, ARBITRARY_TABLE_PROVIDER,
},
};
@ -510,7 +510,7 @@ mod tests {
async fn new_partition(sort_key: SortKeyState) -> Arc<Mutex<PartitionData>> {
let buffer_tree = BufferTree::new(
Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)),
Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER),
Arc::clone(&*ARBITRARY_TABLE_PROVIDER),
Arc::new(
MockPartitionProvider::default().with_partition(
PartitionDataBuilder::new()

View File

@ -48,7 +48,7 @@ mod tests {
test_util::{
make_write_op, populate_catalog, ARBITRARY_NAMESPACE_NAME,
ARBITRARY_NAMESPACE_NAME_PROVIDER, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_NAME_PROVIDER,
ARBITRARY_TABLE_PROVIDER,
},
};
@ -67,7 +67,7 @@ mod tests {
// Init the buffer tree
let buf = BufferTree::new(
Arc::clone(&*ARBITRARY_NAMESPACE_NAME_PROVIDER),
Arc::clone(&*ARBITRARY_TABLE_NAME_PROVIDER),
Arc::clone(&*ARBITRARY_TABLE_PROVIDER),
Arc::new(CatalogPartitionResolver::new(Arc::clone(&catalog))),
Arc::new(MockPostWriteObserver::default()),
Arc::new(metric::Registry::default()),

View File

@ -202,7 +202,7 @@ where
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
table_id = %ctx.table_id(),
table_name = %ctx.table_name(),
table = %ctx.table(),
partition_id = %ctx.partition_id(),
partition_key = %ctx.partition_key(),
?sort_key,
@ -218,7 +218,7 @@ where
compact_persisting_batch(
&worker_state.exec,
sort_key,
ctx.table_name().get().await,
ctx.table().get().await.name().clone(),
ctx.data().query_adaptor(),
)
.await
@ -249,7 +249,7 @@ where
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
table_id = %ctx.table_id(),
table_name = %ctx.table_name(),
table = %ctx.table(),
partition_id = %ctx.partition_id(),
partition_key = %ctx.partition_key(),
%object_store_id,
@ -265,7 +265,7 @@ where
namespace_id: ctx.namespace_id(),
namespace_name: Arc::clone(&*ctx.namespace_name().get().await),
table_id: ctx.table_id(),
table_name: Arc::clone(&*ctx.table_name().get().await),
table_name: Arc::clone(ctx.table().get().await.name()),
partition_key: ctx.partition_key().clone(),
compaction_level: CompactionLevel::Initial,
sort_key: Some(data_sort_key),
@ -291,7 +291,7 @@ where
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
table_id = %ctx.table_id(),
table_name = %ctx.table_name(),
table = %ctx.table(),
partition_id = %ctx.partition_id(),
partition_key = %ctx.partition_key(),
%object_store_id,
@ -358,7 +358,7 @@ where
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
table_id = %ctx.table_id(),
table_name = %ctx.table_name(),
table = %ctx.table(),
partition_id = %ctx.partition_id(),
partition_key = %ctx.partition_key(),
?new_sort_key,
@ -394,7 +394,7 @@ where
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
table_id = %ctx.table_id(),
table_name = %ctx.table_name(),
table = %ctx.table(),
partition_id = %ctx.partition_id(),
partition_key = %ctx.partition_key(),
expected=?old_sort_key,
@ -420,7 +420,7 @@ where
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
table_id = %ctx.table_id(),
table_name = %ctx.table_name(),
table = %ctx.table(),
partition_id = %ctx.partition_id(),
partition_key = %ctx.partition_key(),
expected=?old_sort_key,
@ -460,7 +460,7 @@ where
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
table_id = %ctx.table_id(),
table_name = %ctx.table_name(),
table = %ctx.table(),
partition_id = %ctx.partition_id(),
partition_key = %ctx.partition_key(),
?old_sort_key,
@ -488,7 +488,7 @@ where
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
table_id = %ctx.table_id(),
table_name = %ctx.table_name(),
table = %ctx.table(),
partition_id = %ctx.partition_id(),
partition_key = %ctx.partition_key(),
%object_store_id,
@ -512,7 +512,7 @@ where
namespace_id = %ctx.namespace_id(),
namespace_name = %ctx.namespace_name(),
table_id = %ctx.table_id(),
table_name = %ctx.table_name(),
table = %ctx.table(),
partition_id = %ctx.partition_id(),
partition_key = %ctx.partition_key(),
%object_store_id,

View File

@ -4,6 +4,7 @@ use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
use predicate::Predicate;
use trace::span::Span;
use super::QueryExec;
@ -64,12 +65,13 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
predicate: Option<Predicate>,
) -> Result<Self::Response, QueryError> {
let t = self.time_provider.now();
let res = self
.inner
.query_exec(namespace_id, table_id, columns, span)
.query_exec(namespace_id, table_id, columns, span, predicate)
.await;
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
@ -113,7 +115,7 @@ mod tests {
// Call the decorator and assert the return value
let got = decorator
.query_exec(NamespaceId::new(42), TableId::new(24), vec![], None)
.query_exec(NamespaceId::new(42), TableId::new(24), vec![], None, None)
.await;
assert_matches!(got, $($want_ret)+);

View File

@ -1,6 +1,7 @@
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use parking_lot::Mutex;
use predicate::Predicate;
use trace::span::Span;
use super::{response::QueryResponse, QueryError, QueryExec};
@ -27,6 +28,7 @@ impl QueryExec for MockQueryExec {
_table_id: TableId,
_columns: Vec<String>,
_span: Option<Span>,
_predicate: Option<Predicate>,
) -> Result<Self::Response, QueryError> {
self.response
.lock()

View File

@ -58,6 +58,7 @@ use iox_time::{SystemProvider, Time, TimeProvider};
use metric::{DurationHistogram, Metric, U64Histogram, U64HistogramOptions};
use observability_deps::tracing::debug;
use pin_project::{pin_project, pinned_drop};
use predicate::Predicate;
use trace::span::Span;
use crate::query::{
@ -204,12 +205,15 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
predicate: Option<Predicate>,
) -> Result<Self::Response, QueryError> {
let started_at = self.time_provider.now();
// TODO(savage): Would accepting a predicate here require additional
// metrics to be added?
let stream = self
.inner
.query_exec(namespace_id, table_id, columns, span)
.query_exec(namespace_id, table_id, columns, span, predicate)
.await?;
let stream = QueryMetricContext::new(
@ -467,7 +471,13 @@ mod tests {
.with_time_provider(Arc::clone(&mock_time));
let response = layer
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.query_exec(
ARBITRARY_NAMESPACE_ID,
ARBITRARY_TABLE_ID,
vec![],
None,
None,
)
.await
.expect("query should succeed");
@ -548,7 +558,13 @@ mod tests {
.with_time_provider(Arc::clone(&mock_time));
let response = layer
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.query_exec(
ARBITRARY_NAMESPACE_ID,
ARBITRARY_TABLE_ID,
vec![],
None,
None,
)
.await
.expect("query should succeed");
@ -628,7 +644,13 @@ mod tests {
.with_time_provider(Arc::clone(&mock_time));
let response = layer
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.query_exec(
ARBITRARY_NAMESPACE_ID,
ARBITRARY_TABLE_ID,
vec![],
None,
None,
)
.await
.expect("query should succeed");
@ -708,7 +730,13 @@ mod tests {
.with_time_provider(Arc::clone(&mock_time));
let response = layer
.query_exec(ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, vec![], None)
.query_exec(
ARBITRARY_NAMESPACE_ID,
ARBITRARY_TABLE_ID,
vec![],
None,
None,
)
.await
.expect("query should succeed");

View File

@ -2,6 +2,7 @@ use std::borrow::Cow;
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use predicate::Predicate;
use trace::span::{Span, SpanRecorder};
use super::QueryExec;
@ -42,12 +43,19 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
predicate: Option<Predicate>,
) -> Result<Self::Response, QueryError> {
let mut recorder = SpanRecorder::new(span).child(self.name.clone());
match self
.inner
.query_exec(namespace_id, table_id, columns, recorder.span().cloned())
.query_exec(
namespace_id,
table_id,
columns,
recorder.span().cloned(),
predicate,
)
.await
{
Ok(v) => {
@ -111,6 +119,7 @@ mod tests {
TableId::new(24),
vec![],
Some(span.child("root span")),
None,
)
.await
.expect("wrapper should not modify result");
@ -134,6 +143,7 @@ mod tests {
TableId::new(24),
vec![],
Some(span.child("root span")),
None,
)
.await
.expect_err("wrapper should not modify result");

View File

@ -2,6 +2,7 @@ use std::{fmt::Debug, ops::Deref, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use predicate::Predicate;
use thiserror::Error;
use trace::span::Span;
@ -25,6 +26,7 @@ pub(crate) trait QueryExec: Send + Sync + Debug {
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
predicate: Option<Predicate>,
) -> Result<Self::Response, QueryError>;
}
@ -41,9 +43,10 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
predicate: Option<Predicate>,
) -> Result<Self::Response, QueryError> {
self.deref()
.query_exec(namespace_id, table_id, columns, span)
.query_exec(namespace_id, table_id, columns, span, predicate)
.await
}
}

View File

@ -5,7 +5,7 @@ use std::{any::Any, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::util::ensure_schema;
use data_types::{ChunkId, ChunkOrder, PartitionId};
use data_types::{ChunkId, ChunkOrder, PartitionId, TimestampMinMax};
use datafusion::physical_plan::Statistics;
use iox_query::{
util::{compute_timenanosecond_min_max, create_basic_summary},
@ -105,16 +105,26 @@ impl QueryAdaptor {
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
}
/// Number of rows, useful for building stats
pub(crate) fn num_rows(&self) -> u64 {
self.data.iter().map(|b| b.num_rows()).sum::<usize>() as u64
}
/// Time range, useful for building stats
pub(crate) fn ts_min_max(&self) -> TimestampMinMax {
compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref()))
.expect("Should have time range")
}
}
impl QueryChunk for QueryAdaptor {
fn stats(&self) -> Arc<Statistics> {
Arc::clone(self.stats.get_or_init(|| {
let ts_min_max = compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref()))
.expect("Should have time range");
let ts_min_max = self.ts_min_max();
Arc::new(create_basic_summary(
self.data.iter().map(|b| b.num_rows()).sum::<usize>() as u64,
self.num_rows(),
self.schema(),
ts_min_max,
))

View File

@ -12,6 +12,7 @@ use futures::{Stream, StreamExt, TryStreamExt};
use ingester_query_grpc::influxdata::iox::ingester::v1 as proto;
use metric::{DurationHistogram, U64Counter};
use observability_deps::tracing::*;
use predicate::Predicate;
use prost::Message;
use thiserror::Error;
use tokio::sync::{Semaphore, TryAcquireError};
@ -48,6 +49,10 @@ enum Error {
/// The number of simultaneous queries being executed has been reached.
#[error("simultaneous query limit exceeded")]
RequestLimit,
/// The payload within the request has an invalid field value.
#[error("field violation: {0}")]
FieldViolation(#[from] ingester_query_grpc::FieldViolation),
}
/// Map a query-execution error into a [`tonic::Status`].
@ -77,6 +82,10 @@ impl From<Error> for tonic::Status {
warn!("simultaneous query limit exceeded");
Code::ResourceExhausted
}
Error::FieldViolation(_) => {
debug!(error=%e, "request contains field violation");
Code::InvalidArgument
}
};
Self::new(code, e.to_string())
@ -188,18 +197,25 @@ where
let ticket = request.into_inner();
let request = proto::IngesterQueryRequest::decode(&*ticket.ticket).map_err(Error::from)?;
// Extract the namespace/table identifiers
// Extract the namespace/table identifiers and the query predicate
let namespace_id = NamespaceId::new(request.namespace_id);
let table_id = TableId::new(request.table_id);
// Predicate pushdown is part of the API, but not implemented.
if let Some(p) = request.predicate {
debug!(predicate=?p, "ignoring query predicate (unsupported)");
}
let predicate = if let Some(p) = request.predicate {
debug!(predicate=?p, "received query predicate");
Some(Predicate::try_from(p).map_err(Error::from)?)
} else {
None
};
let response = match self
.query_handler
.query_exec(namespace_id, table_id, request.columns, span.clone())
.query_exec(
namespace_id,
table_id,
request.columns,
span.clone(),
predicate,
)
.await
{
Ok(v) => v,

View File

@ -1,6 +1,9 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId};
use data_types::{
partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionId, PartitionKey,
SequenceNumber, TableId,
};
use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace};
use lazy_static::lazy_static;
use mutable_batch_lp::lines_to_batches;
@ -15,8 +18,8 @@ use crate::{
},
partition::{PartitionData, SortKeyState},
table::{
name_resolver::{mock::MockTableNameProvider, TableNameProvider},
TableName,
metadata_resolver::{mock::MockTableProvider, TableProvider},
TableMetadata, TableName,
},
},
deferred_load::DeferredLoad,
@ -44,10 +47,15 @@ pub(crate) fn defer_namespace_name_1_ms() -> Arc<DeferredLoad<NamespaceName>> {
))
}
pub(crate) fn defer_table_name_1_sec() -> Arc<DeferredLoad<TableName>> {
pub(crate) fn defer_table_metadata_1_sec() -> Arc<DeferredLoad<TableMetadata>> {
Arc::new(DeferredLoad::new(
Duration::from_secs(1),
async { ARBITRARY_TABLE_NAME.clone() },
async {
TableMetadata::new_for_testing(
ARBITRARY_TABLE_NAME.clone(),
TablePartitionTemplateOverride::default(),
)
},
&metric::Registry::default(),
))
}
@ -60,8 +68,11 @@ lazy_static! {
pub(crate) static ref ARBITRARY_NAMESPACE_NAME_PROVIDER: Arc<dyn NamespaceNameProvider> =
Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME));
pub(crate) static ref ARBITRARY_TABLE_NAME: TableName = TableName::from("bananas");
pub(crate) static ref ARBITRARY_TABLE_NAME_PROVIDER: Arc<dyn TableNameProvider> =
Arc::new(MockTableNameProvider::new(&**ARBITRARY_TABLE_NAME));
pub(crate) static ref ARBITRARY_TABLE_PROVIDER: Arc<dyn TableProvider> =
Arc::new(MockTableProvider::new(TableMetadata::new_for_testing(
ARBITRARY_TABLE_NAME.clone(),
TablePartitionTemplateOverride::default()
)));
}
/// Build a [`PartitionData`] with mostly arbitrary-yet-valid values for tests.
@ -71,7 +82,7 @@ pub(crate) struct PartitionDataBuilder {
partition_key: Option<PartitionKey>,
namespace_id: Option<NamespaceId>,
table_id: Option<TableId>,
table_name_loader: Option<Arc<DeferredLoad<TableName>>>,
table_loader: Option<Arc<DeferredLoad<TableMetadata>>>,
namespace_loader: Option<Arc<DeferredLoad<NamespaceName>>>,
sort_key: Option<SortKeyState>,
}
@ -101,11 +112,11 @@ impl PartitionDataBuilder {
self
}
pub(crate) fn with_table_name_loader(
pub(crate) fn with_table_loader(
mut self,
table_name_loader: Arc<DeferredLoad<TableName>>,
table_loader: Arc<DeferredLoad<TableMetadata>>,
) -> Self {
self.table_name_loader = Some(table_name_loader);
self.table_loader = Some(table_loader);
self
}
@ -134,8 +145,7 @@ impl PartitionDataBuilder {
self.namespace_loader
.unwrap_or_else(defer_namespace_name_1_sec),
self.table_id.unwrap_or(ARBITRARY_TABLE_ID),
self.table_name_loader
.unwrap_or_else(defer_table_name_1_sec),
self.table_loader.unwrap_or_else(defer_table_metadata_1_sec),
self.sort_key.unwrap_or(SortKeyState::Provided(None)),
)
}

View File

@ -34,6 +34,7 @@ use schema::{
};
use std::{any::Any, fmt::Debug, sync::Arc};
pub mod chunk_statistics;
pub mod config;
pub mod exec;
pub mod frontend;

View File

@ -17,6 +17,7 @@ use data_types::{
};
use datafusion::scalar::ScalarValue;
use iox_catalog::interface::Catalog;
use iox_query::chunk_statistics::{ColumnRange, ColumnRanges};
use iox_time::TimeProvider;
use observability_deps::tracing::debug;
use schema::sort::SortKey;
@ -27,8 +28,6 @@ use std::{
};
use trace::span::Span;
use crate::df_stats::{ColumnRange, ColumnRanges};
use super::{namespace::CachedTable, ram::RamSize};
const CACHE_ID: &str = "partition";

View File

@ -6,10 +6,7 @@ use self::{
invalidate_on_error::InvalidateOnErrorFlightClient,
test_util::MockIngesterConnection,
};
use crate::{
cache::{namespace::CachedTable, CatalogCache},
df_stats::{create_chunk_statistics, ColumnRanges},
};
use crate::cache::{namespace::CachedTable, CatalogCache};
use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
use arrow_flight::decode::DecodedPayload;
use async_trait::async_trait;
@ -22,7 +19,11 @@ use ingester_query_grpc::{
encode_proto_predicate_as_base64, influxdata::iox::ingester::v1::IngesterQueryResponseMetadata,
IngesterQueryRequest,
};
use iox_query::{util::compute_timenanosecond_min_max, QueryChunk, QueryChunkData};
use iox_query::{
chunk_statistics::{create_chunk_statistics, ColumnRanges},
util::compute_timenanosecond_min_max,
QueryChunk, QueryChunkData,
};
use iox_time::{Time, TimeProvider};
use metric::{DurationHistogram, Metric};
use observability_deps::tracing::{debug, trace, warn};

View File

@ -18,7 +18,6 @@ use workspace_hack as _;
mod cache;
mod database;
mod df_stats;
mod ingester;
mod namespace;
mod parquet;

View File

@ -2,6 +2,7 @@
use data_types::{ChunkId, ChunkOrder, PartitionId};
use datafusion::physical_plan::Statistics;
use iox_query::chunk_statistics::{create_chunk_statistics, ColumnRanges};
use parquet_file::chunk::ParquetChunk;
use schema::sort::SortKey;
use std::sync::Arc;
@ -11,8 +12,6 @@ mod query_access;
pub use creation::ChunkAdapter;
use crate::df_stats::{create_chunk_statistics, ColumnRanges};
/// Immutable metadata attached to a [`QuerierParquetChunk`].
#[derive(Debug)]
pub struct QuerierParquetChunkMeta {

View File

@ -492,7 +492,6 @@ mod tests {
use super::*;
use crate::{
cache::test_util::{assert_cache_access_metric_count, assert_catalog_access_metric_count},
df_stats::ColumnRange,
ingester::{test_util::MockIngesterConnection, IngesterPartition},
table::test_util::{querier_table, IngesterPartitionBuilder},
};
@ -506,7 +505,7 @@ mod tests {
use generated_types::influxdata::iox::partition_template::v1::{
template_part::Part, PartitionTemplate, TemplatePart,
};
use iox_query::exec::IOxSessionContext;
use iox_query::{chunk_statistics::ColumnRange, exec::IOxSessionContext};
use iox_tests::{TestCatalog, TestParquetFileBuilder, TestTable};
use predicate::Predicate;
use schema::{builder::SchemaBuilder, InfluxFieldType, TIME_COLUMN_NAME};

View File

@ -1,11 +1,12 @@
use super::{PruneMetrics, QuerierTable, QuerierTableArgs};
use crate::{
cache::CatalogCache, create_ingester_connection_for_testing, df_stats::ColumnRanges,
parquet::ChunkAdapter, IngesterPartition,
cache::CatalogCache, create_ingester_connection_for_testing, parquet::ChunkAdapter,
IngesterPartition,
};
use arrow::record_batch::RecordBatch;
use data_types::ChunkId;
use iox_catalog::interface::{get_schema_by_name, SoftDeletedRows};
use iox_query::chunk_statistics::ColumnRanges;
use iox_tests::{TestCatalog, TestPartition, TestTable};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::{Projection, Schema};

View File

@ -481,24 +481,43 @@ impl MiniCluster {
.await?
.into_inner();
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
assert!(matches!(msg, DecodedPayload::None), "{msg:?}");
let schema = next_message(&mut performed_query)
.await
.map(|(msg, _)| unwrap_schema(msg));
let mut record_batches = vec![];
while let Some((msg, _md)) = next_message(&mut performed_query).await {
let batch = unwrap_record_batch(msg);
record_batches.push(batch);
let mut partitions = vec![];
let mut current_partition = None;
while let Some((msg, app_metadata)) = next_message(&mut performed_query).await {
match msg {
DecodedPayload::None => {
if let Some(p) = std::mem::take(&mut current_partition) {
partitions.push(p);
}
current_partition = Some(IngesterResponsePartition {
app_metadata,
schema: None,
record_batches: vec![],
});
}
DecodedPayload::Schema(schema) => {
let current_partition =
current_partition.as_mut().expect("schema w/o partition");
assert!(
current_partition.schema.is_none(),
"got two schemas for a single partition"
);
current_partition.schema = Some(schema);
}
DecodedPayload::RecordBatch(batch) => {
let current_partition =
current_partition.as_mut().expect("batch w/o partition");
assert!(current_partition.schema.is_some(), "batch w/o schema");
current_partition.record_batches.push(batch);
}
}
}
Ok(IngesterResponse {
app_metadata,
schema,
record_batches,
})
if let Some(p) = current_partition {
partitions.push(p);
}
Ok(IngesterResponse { partitions })
}
/// Ask all of the ingesters to persist their data.
@ -589,6 +608,11 @@ impl MiniCluster {
/// Gathers data from ingester Flight queries
#[derive(Debug)]
pub struct IngesterResponse {
pub partitions: Vec<IngesterResponsePartition>,
}
#[derive(Debug)]
pub struct IngesterResponsePartition {
pub app_metadata: IngesterQueryResponseMetadata,
pub schema: Option<SchemaRef>,
pub record_batches: Vec<RecordBatch>,
@ -708,17 +732,3 @@ async fn next_message(
Some((payload, app_metadata))
}
fn unwrap_schema(msg: DecodedPayload) -> SchemaRef {
match msg {
DecodedPayload::Schema(s) => s,
_ => panic!("Unexpected message type: {msg:?}"),
}
}
fn unwrap_record_batch(msg: DecodedPayload) -> RecordBatch {
match msg {
DecodedPayload::RecordBatch(b) => b,
_ => panic!("Unexpected message type: {msg:?}"),
}
}