From 3588a066475dd91ec41f3875f12d4edb0c0a6883 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 19 Apr 2022 13:09:40 +0200 Subject: [PATCH] feat: correctly dismantle ingester gRPC reponse in querier (#4323) This now correctly processes record batches for the different partitions. The actual code change is rather small but it requires some substantial test infrastructure. --- querier/src/ingester/flight_client.rs | 63 +++- querier/src/ingester/mod.rs | 450 +++++++++++++++++++++++--- 2 files changed, 458 insertions(+), 55 deletions(-) diff --git a/querier/src/ingester/flight_client.rs b/querier/src/ingester/flight_client.rs index a16911a614..7cb4c03409 100644 --- a/querier/src/ingester/flight_client.rs +++ b/querier/src/ingester/flight_client.rs @@ -1,5 +1,10 @@ -use std::fmt::Debug; +use std::{ + fmt::Debug, + ops::{Deref, DerefMut}, + sync::Arc, +}; +use arrow::{datatypes::Schema, record_batch::RecordBatch}; use async_trait::async_trait; use client_util::connection; use data_types2::IngesterQueryRequest; @@ -43,7 +48,7 @@ pub trait FlightClient: Debug + Send + Sync + 'static { &self, ingester_address: &str, request: IngesterQueryRequest, - ) -> Result, Error>; + ) -> Result, Error>; } /// Default [`FlightClient`] implemenetation that uses a real client. @@ -88,7 +93,7 @@ impl FlightClient for FlightClientImpl { &self, ingester_addr: &str, request: IngesterQueryRequest, - ) -> Result, Error> { + ) -> Result, Error> { // TODO maybe cache this connection let mut client = self.connect(ingester_addr).await?; @@ -96,6 +101,56 @@ impl FlightClient for FlightClientImpl { let request: flight::generated_types::IngesterQueryRequest = request.try_into().context(CreatingRequestSnafu)?; - client.perform_query(request).await.context(FlightSnafu) + let perform_query = client.perform_query(request).await.context(FlightSnafu)?; + Ok(Box::new(perform_query)) + } +} + +/// Data that is returned by an ingester gRPC query. +/// +/// This is mostly the same as [`PerformQuery`] but allows some easier mocking. +#[async_trait] +pub trait QueryData: Debug + Send + 'static { + /// Returns the next `RecordBatch` available for this query, or `None` if + /// there are no further results available. + async fn next(&mut self) -> Result, FlightError>; + + /// App metadata that was part of the response. + fn app_metadata(&self) -> &IngesterQueryResponseMetadata; + + /// Schema. + fn schema(&self) -> Arc; +} + +#[async_trait] +impl QueryData for Box +where + T: QueryData + ?Sized, +{ + async fn next(&mut self) -> Result, FlightError> { + self.deref_mut().next().await + } + + fn app_metadata(&self) -> &IngesterQueryResponseMetadata { + self.deref().app_metadata() + } + + fn schema(&self) -> Arc { + self.deref().schema() + } +} + +#[async_trait] +impl QueryData for PerformQuery { + async fn next(&mut self) -> Result, FlightError> { + self.next().await + } + + fn app_metadata(&self) -> &IngesterQueryResponseMetadata { + self.app_metadata() + } + + fn schema(&self) -> Arc { + self.schema() } } diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 24ae0e2603..c82f9bbd6f 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -1,9 +1,9 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch}; use async_trait::async_trait; use data_types2::{ - ChunkAddr, ChunkId, ChunkOrder, IngesterQueryRequest, SequenceNumber, TableSummary, + ChunkAddr, ChunkId, ChunkOrder, IngesterQueryRequest, PartitionId, SequenceNumber, TableSummary, }; use datafusion_util::MemoryStream; use observability_deps::tracing::{debug, trace}; @@ -13,7 +13,7 @@ use query::{ QueryChunk, QueryChunkError, QueryChunkMeta, }; use schema::{selection::Selection, sort::SortKey, Schema}; -use snafu::{ResultExt, Snafu}; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; use self::{ flight_client::{Error as FlightClientError, FlightClient, FlightClientImpl, FlightError}, @@ -48,6 +48,34 @@ pub enum Error { ingester_address: String, source: FlightClientError, }, + + #[snafu(display("Batch {} is missing partition id from '{}'", pos, ingester_address))] + MissingPartitionId { + pos: usize, + ingester_address: String, + }, + + #[snafu(display( + "Got too many partition IDs from '{}', expected {} but got {}", + ingester_address, + expected, + actual + ))] + TooManyPartitionIds { + expected: usize, + actual: usize, + ingester_address: String, + }, + + #[snafu(display( + "Got batch for partition id {} that was not marked as unpersisted from '{}'", + partition_id, + ingester_address, + ))] + UnknownPartitionId { + partition_id: i64, + ingester_address: String, + }, } pub type Result = std::result::Result; @@ -116,7 +144,7 @@ impl IngesterConnection for IngesterConnectionImpl { predicate: &Predicate, expected_schema: Arc, ) -> Result>> { - let mut partitions = vec![]; + let mut ingester_partitions = vec![]; // TODO make these requests in parallel for ingester_address in &self.ingester_addresses { @@ -147,30 +175,71 @@ impl IngesterConnection for IngesterConnectionImpl { } let mut perform_query = query_res.context(RemoteQuerySnafu { ingester_address })?; - // Gather up all the results (todo pass along partition information in metadata) - let batches = perform_query.collect().await.expect("collecting"); - debug!( - num_batches = batches.len(), - "Received batches from ingester" + // read unpersisted partitions + // map partition_id -> (PartitionMetadata, Vec)) + let mut partitions: HashMap<_, _> = perform_query + .app_metadata() + .unpersisted_partitions + .iter() + .map(|(id, state)| (*id, (state.clone(), vec![]))) + .collect(); + + // sort batches into partitions + let mut num_batches = 0usize; + let partition_ids = perform_query.app_metadata().batch_partition_ids.clone(); + while let Some(batch) = perform_query + .next() + .await + .map_err(|source| FlightClientError::Flight { source }) + .context(RemoteQuerySnafu { ingester_address })? + { + let partition_id = + *partition_ids + .get(num_batches) + .context(MissingPartitionIdSnafu { + pos: num_batches, + ingester_address, + })?; + partitions + .get_mut(&partition_id) + .context(UnknownPartitionIdSnafu { + partition_id, + ingester_address, + })? + .1 + .push(batch); + num_batches += 1; + } + debug!(num_batches, "Received batches from ingester"); + trace!(?partitions, schema=?perform_query.schema(), "Detailed from ingester"); + ensure!( + num_batches == partition_ids.len(), + TooManyPartitionIdsSnafu { + actual: partition_ids.len(), + expected: num_batches, + ingester_address + }, ); - trace!(?batches, schema=?perform_query.schema(), "Detailed from ingester"); - // TODO split chunks up based on table partition, gather - // sequence numbers per partition - let parquet_max_sequence_number = None; - - let ingester_partition = IngesterPartition::try_new( - ChunkId::new(), - Arc::clone(&namespace_name), - Arc::clone(&table_name), - Arc::clone(&expected_schema), - parquet_max_sequence_number, - batches, - )?; - - partitions.push(Arc::new(ingester_partition)); + for (partition_id, (state, batches)) in partitions { + // do NOT filter out empty partitions, because the caller of this functions needs the attached metadata + // to select the right parquet files and tombstones + let ingester_partition = IngesterPartition::try_new( + ChunkId::new(), + Arc::clone(&namespace_name), + Arc::clone(&table_name), + PartitionId::new(partition_id), + Arc::clone(&expected_schema), + state.parquet_max_sequence_number.map(SequenceNumber::new), + state.tombstone_max_sequence_number.map(SequenceNumber::new), + batches, + )?; + ingester_partitions.push(Arc::new(ingester_partition)); + } } - Ok(partitions) + + ingester_partitions.sort_by_key(|p| p.partition_id); + Ok(ingester_partitions) } } @@ -192,11 +261,12 @@ pub struct IngesterPartition { chunk_id: ChunkId, namespace_name: Arc, table_name: Arc, + partition_id: PartitionId, schema: Arc, /// Maximum sequence number of persisted data for this partition in the ingester - /// TODO make this not an option parquet_max_sequence_number: Option, + tombstone_max_sequence_number: Option, batches: Vec, } @@ -204,12 +274,15 @@ pub struct IngesterPartition { impl IngesterPartition { /// Creates a new IngesterPartition, translating the passed /// `RecordBatches` into the correct types + #[allow(clippy::too_many_arguments)] pub fn try_new( chunk_id: ChunkId, namespace_name: Arc, table_name: Arc, + partition_id: PartitionId, expected_schema: Arc, parquet_max_sequence_number: Option, + tombstone_max_sequence_number: Option, batches: Vec, ) -> Result { // ensure that the schema of the batches matches the required @@ -227,8 +300,10 @@ impl IngesterPartition { chunk_id, namespace_name, table_name, + partition_id, schema: expected_schema, parquet_max_sequence_number, + tombstone_max_sequence_number, batches, }) } @@ -264,7 +339,7 @@ impl QueryChunk for IngesterPartition { ChunkAddr { db_name: Arc::clone(&self.namespace_name), table_name: Arc::clone(&self.table_name), - partition_key: Arc::from("TODO PARTITION KEY"), + partition_key: Arc::from(self.partition_id.to_string()), chunk_id: self.chunk_id, } } @@ -377,20 +452,20 @@ fn ensure_schema(batch: RecordBatch, expected_schema: &Schema) -> Result Result>, Error> { let namespace = Arc::from("namespace"); let table = Arc::from("table"); let columns = vec![String::from("col")]; - let schema = Arc::new(SchemaBuilder::new().build().unwrap()); + let schema = schema(); ingester_conn .partitions(namespace, table, columns, &Predicate::default(), schema) .await } + fn schema() -> Arc { + Arc::new( + SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Integer) + .timestamp() + .build() + .unwrap(), + ) + } + + fn lp_to_record_batch(lp: &str) -> RecordBatch { + lp_to_mutable_batch(lp).1.to_arrow(Selection::All).unwrap() + } + + #[derive(Debug)] + struct MockQueryData { + results: Vec>, + app_metadata: IngesterQueryResponseMetadata, + schema: Arc, + } + + #[async_trait] + impl QueryData for MockQueryData { + async fn next(&mut self) -> Result, FlightError> { + if self.results.is_empty() { + Ok(None) + } else { + self.results.remove(0).map(Some) + } + } + + fn app_metadata(&self) -> &IngesterQueryResponseMetadata { + &self.app_metadata + } + + fn schema(&self) -> Arc { + Arc::clone(&self.schema) + } + } + #[derive(Debug)] struct MockFlightClient { - responses: Mutex< - HashMap, FlightClientError>>, - >, + responses: Mutex>>, } impl MockFlightClient { @@ -461,20 +815,10 @@ mod tests { } } - impl - From< - [( - &'static str, - Result, FlightClientError>, - ); N], - > for MockFlightClient + impl From<[(&'static str, Result); N]> + for MockFlightClient { - fn from( - responses: [( - &'static str, - Result, FlightClientError>, - ); N], - ) -> Self { + fn from(responses: [(&'static str, Result); N]) -> Self { Self { responses: Mutex::new( responses @@ -492,12 +836,13 @@ mod tests { &self, ingester_address: &str, _request: IngesterQueryRequest, - ) -> Result, FlightClientError> { + ) -> Result, FlightClientError> { self.responses .lock() .await .remove(ingester_address) .expect("Response not mocked") + .map(|query_data| Box::new(query_data) as _) } } @@ -514,13 +859,16 @@ mod tests { for case in cases { let parquet_max_sequence_number = None; + let tombstone_max_sequence_number = None; // Construct a partition and ensure it doesn't error let ingester_partition = IngesterPartition::try_new( ChunkId::new(), "ns".into(), "table".into(), + PartitionId::new(1), Arc::clone(&expected_schema), parquet_max_sequence_number, + tombstone_max_sequence_number, vec![case], ) .unwrap();