diff --git a/generated_types/protos/influxdata/iox/ingester/v1/query.proto b/generated_types/protos/influxdata/iox/ingester/v1/query.proto index a07d79b805..cb163b2423 100644 --- a/generated_types/protos/influxdata/iox/ingester/v1/query.proto +++ b/generated_types/protos/influxdata/iox/ingester/v1/query.proto @@ -38,8 +38,7 @@ message IngesterQueryRequest { } // Metadata that the ingester provides to the query service along with the results. Serialized -// in the FlightData's app_metadata for the schema that is returned as the first item from the -// Arrow Flight doGet request. +// in every FlightData's app_metadata . message IngesterQueryResponseMetadata { // There was no field 1, oops. reserved 1; @@ -65,6 +64,18 @@ message IngesterQueryResponseMetadata { // Map each record batch to a partition ID. repeated int64 batch_partition_ids = 6; + + // Partition id for this batch. + // + // This field is currently NOT used by the ingester but will be soon. + int64 partition_id = 7; + + // Optional partition status. + // + // If this is given, then no schema and no batch will be part of this FlightData object. + // + // This field is currently NOT used by the ingester but will be soon. + PartitionStatus status = 8; } // Status of a partition that has unpersisted data. diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index 21cefdb12e..99339d75eb 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -398,6 +398,7 @@ impl GetStream { }) .collect(), batch_partition_ids: batch_partition_ids.into_iter().map(|id| id.get()).collect(), + ..Default::default() }; prost::Message::encode(&app_metadata, &mut bytes).context(SerializationSnafu)?; schema_flight_data.app_metadata = bytes.to_vec(); diff --git a/querier/src/ingester/flight_client.rs b/querier/src/ingester/flight_client.rs index 81f0709e69..2841d49c31 100644 --- a/querier/src/ingester/flight_client.rs +++ b/querier/src/ingester/flight_client.rs @@ -1,6 +1,7 @@ use arrow::{datatypes::Schema, record_batch::RecordBatch}; use async_trait::async_trait; use client_util::connection::{self, Connection}; +use data_types::PartitionId; use generated_types::ingester::IngesterQueryRequest; use influxdb_iox_client::flight::{ generated_types as proto, @@ -8,12 +9,7 @@ use influxdb_iox_client::flight::{ }; use observability_deps::tracing::debug; use snafu::{ResultExt, Snafu}; -use std::{ - collections::HashMap, - fmt::Debug, - ops::{Deref, DerefMut}, - sync::Arc, -}; +use std::{collections::HashMap, fmt::Debug, ops::DerefMut, sync::Arc}; pub use influxdb_iox_client::flight::Error as FlightError; @@ -119,6 +115,8 @@ impl FlightClient for FlightClientImpl { inner: perform_query, schema, app_metadata, + batch_counter: 0, + state: Default::default(), })) } } @@ -128,15 +126,11 @@ impl FlightClient for FlightClientImpl { /// This is mostly the same as [`PerformQuery`] but allows some easier mocking. #[async_trait] pub trait QueryData: Debug + Send + 'static { - /// Returns the next `RecordBatch` available for this query, or `None` if + /// Returns the next [`LowLevelMessage`] available for this query, or `None` if /// there are no further results available. - async fn next(&mut self) -> Result, FlightError>; - - /// App metadata that was part of the response. - fn app_metadata(&self) -> &proto::IngesterQueryResponseMetadata; - - /// Schema. - fn schema(&self) -> Arc; + async fn next( + &mut self, + ) -> Result, FlightError>; } #[async_trait] @@ -144,16 +138,67 @@ impl QueryData for Box where T: QueryData + ?Sized, { - async fn next(&mut self) -> Result, FlightError> { + async fn next( + &mut self, + ) -> Result, FlightError> { self.deref_mut().next().await } +} - fn app_metadata(&self) -> &proto::IngesterQueryResponseMetadata { - self.deref().app_metadata() - } +/// Protocol state. +/// +/// ```text +/// +/// [NoPartitionYet] +/// | +/// +-----------------------o +/// | | +/// V | +/// [NewPartition]<-----------o | +/// | | | +/// V | | +/// [PartitionAnnounced]<--o | | +/// | | | | +/// V | | | +/// [SchemaAnnounced] | | | +/// | | | | +/// V | | | +/// [BatchTransmitted] | | | +/// | | | | +/// +-----------------+--+ | +/// | | +/// | o---------------------o +/// | | +/// V V +/// [End]<--o +/// | | +/// o-----o +/// +/// ``` +#[derive(Debug)] +enum PerformQueryAdapterState { + NoPartitionYet, + NewPartition { + partition_id: PartitionId, + batch: RecordBatch, + }, + PartitionAnnounced { + partition_id: PartitionId, + batch: RecordBatch, + }, + SchemaAnnounced { + partition_id: PartitionId, + batch: RecordBatch, + }, + BatchTransmitted { + partition_id: PartitionId, + }, + End, +} - fn schema(&self) -> Arc { - self.deref().schema() +impl Default for PerformQueryAdapterState { + fn default() -> Self { + Self::NoPartitionYet } } @@ -162,18 +207,25 @@ struct PerformQueryAdapter { inner: PerformQuery, app_metadata: proto::IngesterQueryResponseMetadata, schema: Arc, + batch_counter: usize, + state: PerformQueryAdapterState, } -#[async_trait] -impl QueryData for PerformQueryAdapter { - async fn next(&mut self) -> Result, FlightError> { +impl PerformQueryAdapter { + /// Get next batch from underlying [`PerformQuery`] alongside with a batch index (which in turn can be used to + /// look up the batch partition). + /// + /// Returns `Ok(None)` if the stream has ended. + async fn next_batch(&mut self) -> Result, FlightError> { loop { match self.inner.next().await? { None => { return Ok(None); } Some((LowLevelMessage::RecordBatch(batch), _)) => { - return Ok(Some(batch)); + let pos = self.batch_counter; + self.batch_counter += 1; + return Ok(Some((batch, pos))); } // ignore all other message types for now Some((LowLevelMessage::None | LowLevelMessage::Schema(_), _)) => (), @@ -181,12 +233,118 @@ impl QueryData for PerformQueryAdapter { } } - fn app_metadata(&self) -> &proto::IngesterQueryResponseMetadata { - &self.app_metadata + /// Announce new partition (id and its persistence status). + fn announce_partition( + &mut self, + partition_id: PartitionId, + ) -> (LowLevelMessage, proto::IngesterQueryResponseMetadata) { + let meta = proto::IngesterQueryResponseMetadata { + partition_id: partition_id.get(), + status: Some( + self.app_metadata + .unpersisted_partitions + .remove(&partition_id.get()) + .unwrap(), + ), + ..Default::default() + }; + (LowLevelMessage::None, meta) } +} - fn schema(&self) -> Arc { - Arc::clone(&self.schema) +#[async_trait] +impl QueryData for PerformQueryAdapter { + async fn next( + &mut self, + ) -> Result, FlightError> { + loop { + match std::mem::take(&mut self.state) { + PerformQueryAdapterState::End => { + if let Some(partition_id) = + self.app_metadata.unpersisted_partitions.keys().next() + { + let partition_id = PartitionId::new(*partition_id); + return Ok(Some(self.announce_partition(partition_id))); + } else { + return Ok(None); + } + } + PerformQueryAdapterState::NoPartitionYet => { + let (batch, pos) = if let Some(x) = self.next_batch().await? { + x + } else { + self.state = PerformQueryAdapterState::End; + continue; + }; + let partition_id = PartitionId::new(self.app_metadata.batch_partition_ids[pos]); + + self.state = PerformQueryAdapterState::NewPartition { + partition_id, + batch, + }; + continue; + } + PerformQueryAdapterState::NewPartition { + partition_id, + batch, + } => { + self.state = PerformQueryAdapterState::PartitionAnnounced { + partition_id, + batch, + }; + + return Ok(Some(self.announce_partition(partition_id))); + } + PerformQueryAdapterState::PartitionAnnounced { + partition_id, + batch, + } => { + self.state = PerformQueryAdapterState::SchemaAnnounced { + partition_id, + batch, + }; + + let meta = Default::default(); + return Ok(Some(( + LowLevelMessage::Schema(Arc::clone(&self.schema)), + meta, + ))); + } + PerformQueryAdapterState::SchemaAnnounced { + partition_id, + batch, + } => { + self.state = PerformQueryAdapterState::BatchTransmitted { partition_id }; + + let meta = Default::default(); + return Ok(Some((LowLevelMessage::RecordBatch(batch), meta))); + } + PerformQueryAdapterState::BatchTransmitted { partition_id } => { + let (batch, pos) = if let Some(x) = self.next_batch().await? { + x + } else { + self.state = PerformQueryAdapterState::End; + continue; + }; + let partition_id2 = + PartitionId::new(self.app_metadata.batch_partition_ids[pos]); + + if partition_id == partition_id2 { + self.state = PerformQueryAdapterState::PartitionAnnounced { + partition_id, + batch, + }; + continue; + } else { + self.state = PerformQueryAdapterState::NewPartition { + partition_id: partition_id2, + batch, + }; + continue; + } + } + } + } } } diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 4dc3297010..c234ca0d22 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -17,6 +17,7 @@ use generated_types::{ ingester::{encode_proto_predicate_as_base64, IngesterQueryRequest}, write_info::merge_responses, }; +use influxdb_iox_client::flight::low_level::LowLevelMessage; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, util::compute_timenanosecond_min_max, @@ -69,34 +70,6 @@ pub enum Error { source: FlightClientError, }, - #[snafu(display("Batch {} is missing partition id from '{}'", pos, ingester_address))] - MissingPartitionId { - pos: usize, - ingester_address: String, - }, - - #[snafu(display( - "Got too many partition IDs from '{}', expected {} but got {}", - ingester_address, - expected, - actual - ))] - TooManyPartitionIds { - expected: usize, - actual: usize, - ingester_address: String, - }, - - #[snafu(display( - "Got batch for partition id {} that was not marked as unpersisted from '{}'", - partition_id, - ingester_address, - ))] - UnknownPartitionId { - partition_id: i64, - ingester_address: String, - }, - #[snafu(display("Failed to connect to ingester '{}': {}", ingester_address, source))] Connecting { ingester_address: String, @@ -114,6 +87,27 @@ pub enum Error { write_token: String, source: influxdb_iox_client::error::Error, }, + + #[snafu(display( + "Partition status missing for partition {partition_id}, ingestger: {ingester_address}" + ))] + PartitionStatusMissing { + partition_id: i64, + ingester_address: String, + }, + + #[snafu(display( + "Got batch without partition information from ingestger: {ingester_address}" + ))] + BatchWithoutPartition { ingester_address: String }, + + #[snafu(display( + "Duplicate partition info for partition {partition_id}, ingestger: {ingester_address}" + ))] + DuplicatePartitionInfo { + partition_id: i64, + ingester_address: String, + }, } pub type Result = std::result::Result; @@ -271,17 +265,12 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result (PartitionMetadata, Vec)) - let mut partitions: HashMap<_, _> = perform_query - .app_metadata() - .unpersisted_partitions - .iter() - .map(|(id, state)| (*id, (state.clone(), vec![]))) - .collect(); + let mut partitions: HashMap<_, _> = HashMap::new(); // sort batches into partitions let mut num_batches = 0usize; - let partition_ids = perform_query.app_metadata().batch_partition_ids.clone(); - while let Some(batch) = perform_query + let mut current_partition_id = None; + while let Some((msg, md)) = perform_query .next() .await .map_err(|source| FlightClientError::Flight { source }) @@ -289,32 +278,41 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result { + // new partition announced + let partition_id = md.partition_id; + let status = md.status.context(PartitionStatusMissingSnafu { + partition_id, + ingester_address: ingester_address.as_ref(), + })?; + let existing = partitions.insert(partition_id, (status, vec![])); + ensure!( + existing.is_none(), + DuplicatePartitionInfoSnafu { + partition_id, + ingester_address: ingester_address.as_ref() + }, + ); + current_partition_id = Some(partition_id); + } + LowLevelMessage::Schema(_) => { + // can be ignored, schemas are propagated to the batches automatically by the low-level client + } + LowLevelMessage::RecordBatch(batch) => { + let partition_id = current_partition_id.context(BatchWithoutPartitionSnafu { + ingester_address: ingester_address.as_ref(), + })?; + partitions + .get_mut(&partition_id) + .expect("current partition should have been inserted") + .1 + .push(batch); + num_batches += 1; + } + } } debug!(%ingester_address, num_batches, "Received batches from ingester"); - trace!(?partitions, schema=?perform_query.schema(), "Detailed from ingester"); - ensure!( - num_batches == partition_ids.len(), - TooManyPartitionIdsSnafu { - actual: partition_ids.len(), - expected: num_batches, - ingester_address: ingester_address.as_ref(), - }, - ); let mut ingester_partitions = vec![]; for (partition_id, (state, batches)) in partitions { @@ -330,6 +328,7 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result TableSummary { #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashMap}; + use std::collections::HashMap; use arrow::{ array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray}, @@ -883,11 +882,6 @@ mod tests { results: vec![Err(FlightError::GrpcError(tonic::Status::internal( "don't know", )))], - app_metadata: IngesterQueryResponseMetadata { - unpersisted_partitions: BTreeMap::new(), - batch_partition_ids: vec![], - }, - schema: schema().as_arrow(), }), )]) .await, @@ -900,18 +894,7 @@ mod tests { #[tokio::test] async fn test_flight_no_partitions() { let mock_flight_client = Arc::new( - MockFlightClient::new([( - "addr1", - Ok(MockQueryData { - results: vec![], - app_metadata: IngesterQueryResponseMetadata { - unpersisted_partitions: BTreeMap::new(), - batch_partition_ids: vec![], - }, - schema: schema().as_arrow(), - }), - )]) - .await, + MockFlightClient::new([("addr1", Ok(MockQueryData { results: vec![] }))]).await, ); let ingester_conn = mock_flight_client.ingester_conn().await; let partitions = get_partitions(&ingester_conn).await.unwrap(); @@ -924,18 +907,17 @@ mod tests { MockFlightClient::new([( "addr1", Ok(MockQueryData { - results: vec![], - app_metadata: IngesterQueryResponseMetadata { - unpersisted_partitions: BTreeMap::from([( - 1, - PartitionStatus { + results: vec![Ok(( + LowLevelMessage::None, + IngesterQueryResponseMetadata { + partition_id: 1, + status: Some(PartitionStatus { parquet_max_sequence_number: None, tombstone_max_sequence_number: None, - }, - )]), - batch_partition_ids: vec![], - }, - schema: schema().as_arrow(), + }), + ..Default::default() + }, + ))], }), )]) .await, @@ -954,142 +936,189 @@ mod tests { } #[tokio::test] - async fn test_flight_err_missing_partition_id() { - let record_batch = lp_to_record_batch("table foo=1 1"); + async fn test_flight_err_partition_status_missing() { let mock_flight_client = Arc::new( MockFlightClient::new([( "addr1", Ok(MockQueryData { - results: vec![Ok(record_batch)], - app_metadata: IngesterQueryResponseMetadata { - unpersisted_partitions: BTreeMap::from([( - 1, - PartitionStatus { - parquet_max_sequence_number: Some(1), - tombstone_max_sequence_number: Some(2), - }, - )]), - batch_partition_ids: vec![], - }, - schema: schema().as_arrow(), + results: vec![Ok(( + LowLevelMessage::None, + IngesterQueryResponseMetadata { + partition_id: 1, + status: None, + ..Default::default() + }, + ))], }), )]) .await, ); let ingester_conn = mock_flight_client.ingester_conn().await; let err = get_partitions(&ingester_conn).await.unwrap_err(); - assert_matches!(err, Error::MissingPartitionId { .. }); + assert_matches!(err, Error::PartitionStatusMissing { .. }); } #[tokio::test] - async fn test_flight_err_too_many_partition_ids() { - let record_batch = lp_to_record_batch("table foo=1 1"); + async fn test_flight_err_duplicate_partition_info() { let mock_flight_client = Arc::new( MockFlightClient::new([( "addr1", Ok(MockQueryData { - results: vec![Ok(record_batch)], - app_metadata: IngesterQueryResponseMetadata { - unpersisted_partitions: BTreeMap::from([( - 1, - PartitionStatus { - parquet_max_sequence_number: Some(1), - tombstone_max_sequence_number: Some(2), + results: vec![ + Ok(( + LowLevelMessage::None, + IngesterQueryResponseMetadata { + partition_id: 1, + status: Some(PartitionStatus { + parquet_max_sequence_number: None, + tombstone_max_sequence_number: None, + }), + ..Default::default() }, - )]), - batch_partition_ids: vec![1, 2], - }, - schema: schema().as_arrow(), + )), + Ok(( + LowLevelMessage::None, + IngesterQueryResponseMetadata { + partition_id: 2, + status: Some(PartitionStatus { + parquet_max_sequence_number: None, + tombstone_max_sequence_number: None, + }), + ..Default::default() + }, + )), + Ok(( + LowLevelMessage::None, + IngesterQueryResponseMetadata { + partition_id: 1, + status: Some(PartitionStatus { + parquet_max_sequence_number: None, + tombstone_max_sequence_number: None, + }), + ..Default::default() + }, + )), + ], }), )]) .await, ); let ingester_conn = mock_flight_client.ingester_conn().await; let err = get_partitions(&ingester_conn).await.unwrap_err(); - assert_matches!(err, Error::TooManyPartitionIds { .. }); + assert_matches!(err, Error::DuplicatePartitionInfo { .. }); } #[tokio::test] - async fn test_flight_err_unknown_partition_id() { + async fn test_flight_err_batch_without_partition() { let record_batch = lp_to_record_batch("table foo=1 1"); let mock_flight_client = Arc::new( MockFlightClient::new([( "addr1", Ok(MockQueryData { - results: vec![Ok(record_batch)], - app_metadata: IngesterQueryResponseMetadata { - unpersisted_partitions: BTreeMap::from([( - 1, - PartitionStatus { - parquet_max_sequence_number: Some(1), - tombstone_max_sequence_number: Some(2), - }, - )]), - batch_partition_ids: vec![42], - }, - schema: schema().as_arrow(), + results: vec![Ok(( + LowLevelMessage::RecordBatch(record_batch), + IngesterQueryResponseMetadata::default(), + ))], }), )]) .await, ); let ingester_conn = mock_flight_client.ingester_conn().await; let err = get_partitions(&ingester_conn).await.unwrap_err(); - assert_matches!(err, Error::UnknownPartitionId { .. }); + assert_matches!(err, Error::BatchWithoutPartition { .. }); } #[tokio::test] async fn test_flight_many_batches() { let record_batch_1_1 = lp_to_record_batch("table foo=1 1"); - let record_batch_1_2 = lp_to_record_batch("table foo=2 2"); + let record_batch_1_2 = lp_to_record_batch("table bar=20,foo=2 2"); let record_batch_2_1 = lp_to_record_batch("table foo=3 3"); - let record_batch_3_1 = lp_to_record_batch("table foo=4 4"); + let record_batch_3_1 = lp_to_record_batch("table baz=40,foo=4 4"); + + let schema_1_1 = record_batch_1_1.schema(); + let schema_1_2 = record_batch_1_2.schema(); + let schema_2_1 = record_batch_2_1.schema(); + let schema_3_1 = record_batch_3_1.schema(); + let mock_flight_client = Arc::new( MockFlightClient::new([ ( "addr1", Ok(MockQueryData { results: vec![ - Ok(record_batch_1_1), - Ok(record_batch_1_2), - Ok(record_batch_2_1), - ], - app_metadata: IngesterQueryResponseMetadata { - unpersisted_partitions: BTreeMap::from([ - ( - 1, - PartitionStatus { + Ok(( + LowLevelMessage::None, + IngesterQueryResponseMetadata { + partition_id: 1, + status: Some(PartitionStatus { parquet_max_sequence_number: Some(11), tombstone_max_sequence_number: Some(12), - }, - ), - ( - 2, - PartitionStatus { + }), + ..Default::default() + }, + )), + Ok(( + LowLevelMessage::Schema(Arc::clone(&schema_1_1)), + IngesterQueryResponseMetadata::default(), + )), + Ok(( + LowLevelMessage::RecordBatch(record_batch_1_1), + IngesterQueryResponseMetadata::default(), + )), + Ok(( + LowLevelMessage::Schema(Arc::clone(&schema_1_2)), + IngesterQueryResponseMetadata::default(), + )), + Ok(( + LowLevelMessage::RecordBatch(record_batch_1_2), + IngesterQueryResponseMetadata::default(), + )), + Ok(( + LowLevelMessage::None, + IngesterQueryResponseMetadata { + partition_id: 2, + status: Some(PartitionStatus { parquet_max_sequence_number: Some(21), tombstone_max_sequence_number: Some(22), - }, - ), - ]), - batch_partition_ids: vec![1, 1, 2], - }, - schema: schema().as_arrow(), + }), + ..Default::default() + }, + )), + Ok(( + LowLevelMessage::Schema(Arc::clone(&schema_2_1)), + IngesterQueryResponseMetadata::default(), + )), + Ok(( + LowLevelMessage::RecordBatch(record_batch_2_1), + IngesterQueryResponseMetadata::default(), + )), + ], }), ), ( "addr2", Ok(MockQueryData { - results: vec![Ok(record_batch_3_1)], - app_metadata: IngesterQueryResponseMetadata { - unpersisted_partitions: BTreeMap::from([( - 3, - PartitionStatus { - parquet_max_sequence_number: Some(31), - tombstone_max_sequence_number: Some(32), + results: vec![ + Ok(( + LowLevelMessage::None, + IngesterQueryResponseMetadata { + partition_id: 3, + status: Some(PartitionStatus { + parquet_max_sequence_number: Some(31), + tombstone_max_sequence_number: Some(32), + }), + ..Default::default() }, - )]), - batch_partition_ids: vec![3], - }, - schema: schema().as_arrow(), + )), + Ok(( + LowLevelMessage::Schema(Arc::clone(&schema_3_1)), + IngesterQueryResponseMetadata::default(), + )), + Ok(( + LowLevelMessage::RecordBatch(record_batch_3_1), + IngesterQueryResponseMetadata::default(), + )), + ], }), ), ]) @@ -1112,6 +1141,8 @@ mod tests { Some(SequenceNumber::new(12)) ); assert_eq!(p1.batches.len(), 2); + assert_eq!(p1.batches[0].schema(), schema().as_arrow()); + assert_eq!(p1.batches[1].schema(), schema().as_arrow()); let p2 = &partitions[1]; assert_eq!(p2.partition_id.get(), 2); @@ -1125,6 +1156,7 @@ mod tests { Some(SequenceNumber::new(22)) ); assert_eq!(p2.batches.len(), 1); + assert_eq!(p2.batches[0].schema(), schema().as_arrow()); let p3 = &partitions[2]; assert_eq!(p3.partition_id.get(), 3); @@ -1138,6 +1170,7 @@ mod tests { Some(SequenceNumber::new(32)) ); assert_eq!(p3.batches.len(), 1); + assert_eq!(p3.batches[0].schema(), schema().as_arrow()); } async fn get_partitions( @@ -1155,7 +1188,9 @@ mod tests { fn schema() -> Arc { Arc::new( SchemaBuilder::new() - .influx_field("col", InfluxFieldType::Integer) + .influx_field("bar", InfluxFieldType::Float) + .influx_field("baz", InfluxFieldType::Float) + .influx_field("foo", InfluxFieldType::Float) .timestamp() .build() .unwrap(), @@ -1168,28 +1203,20 @@ mod tests { #[derive(Debug)] struct MockQueryData { - results: Vec>, - app_metadata: IngesterQueryResponseMetadata, - schema: Arc, + results: Vec>, } #[async_trait] impl QueryData for MockQueryData { - async fn next(&mut self) -> Result, FlightError> { + async fn next( + &mut self, + ) -> Result, FlightError> { if self.results.is_empty() { Ok(None) } else { self.results.remove(0).map(Some) } } - - fn app_metadata(&self) -> &IngesterQueryResponseMetadata { - &self.app_metadata - } - - fn schema(&self) -> Arc { - Arc::clone(&self.schema) - } } #[derive(Debug)] diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index ec4a28cb2d..6795eb33e3 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -11,12 +11,12 @@ use data_types::{ use datafusion::physical_plan::RecordBatchStream; use datafusion_util::MemoryStream; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; -use futures::{StreamExt, TryStreamExt}; +use futures::TryStreamExt; use generated_types::{ influxdata::iox::ingester::v1::{IngesterQueryResponseMetadata, PartitionStatus}, ingester::IngesterQueryRequest, }; -use influxdb_iox_client::flight::Error as FlightError; +use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError}; use ingester::{ data::{IngesterData, IngesterQueryResponse, Persister, SequencerData}, lifecycle::LifecycleHandle, @@ -951,66 +951,79 @@ impl IngesterFlightClient for MockIngester { ..response }; - Ok(Box::new(QueryDataAdapter::new(response))) + Ok(Box::new(QueryDataAdapter::new(response).await)) } } /// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a /// [`IngesterFlightClientQueryData`] (used by the querier) without doing any real gRPC IO. -#[derive(Debug)] struct QueryDataAdapter { - response: IngesterQueryResponse, - app_metadata: IngesterQueryResponseMetadata, + messages: Box + Send>, +} + +impl std::fmt::Debug for QueryDataAdapter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("QueryDataAdapter").finish_non_exhaustive() + } } impl QueryDataAdapter { /// Create new adapter. /// /// This pre-calculates some data structure that we are going to need later. - fn new(response: IngesterQueryResponse) -> Self { - let app_metadata = IngesterQueryResponseMetadata { - unpersisted_partitions: response - .unpersisted_partitions - .iter() - .map(|(id, status)| { - ( - id.get(), - PartitionStatus { - parquet_max_sequence_number: status - .parquet_max_sequence_number - .map(|x| x.get()), - tombstone_max_sequence_number: status - .tombstone_max_sequence_number - .map(|x| x.get()), - }, - ) - }) - .collect(), - batch_partition_ids: response - .batch_partition_ids - .iter() - .map(|id| id.get()) - .collect(), - }; + async fn new(mut response: IngesterQueryResponse) -> Self { + let mut partitions: BTreeMap<_, _> = response + .unpersisted_partitions + .into_iter() + .map(|(partition_id, status)| (partition_id, (status, vec![]))) + .collect(); + + let mut id_it = response.batch_partition_ids.into_iter(); + while let Some(batch) = response.data.try_next().await.unwrap() { + let partition_id = id_it.next().unwrap(); + partitions.get_mut(&partition_id).unwrap().1.push(batch); + } + + let mut messages = vec![]; + for (partition_id, (status, batches)) in partitions { + let status = PartitionStatus { + parquet_max_sequence_number: status.parquet_max_sequence_number.map(|x| x.get()), + tombstone_max_sequence_number: status + .tombstone_max_sequence_number + .map(|x| x.get()), + }; + messages.push(( + LowLevelMessage::None, + IngesterQueryResponseMetadata { + partition_id: partition_id.get(), + status: Some(status), + ..Default::default() + }, + )); + + for batch in batches { + messages.push(( + LowLevelMessage::Schema(batch.schema()), + IngesterQueryResponseMetadata::default(), + )); + messages.push(( + LowLevelMessage::RecordBatch(batch), + IngesterQueryResponseMetadata::default(), + )); + } + } Self { - response, - app_metadata, + messages: Box::new(messages.into_iter()), } } } #[async_trait] impl IngesterFlightClientQueryData for QueryDataAdapter { - async fn next(&mut self) -> Result, FlightError> { - Ok(self.response.data.next().await.map(|x| x.unwrap())) - } - - fn app_metadata(&self) -> &IngesterQueryResponseMetadata { - &self.app_metadata - } - - fn schema(&self) -> Arc { - self.response.schema.as_arrow() + async fn next( + &mut self, + ) -> Result, FlightError> { + Ok(self.messages.next()) } }