From b294bb98aa09e484f57c4aad83d86fb48624d065 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 11 Oct 2022 16:58:41 +0200 Subject: [PATCH] refactor: move query types to query_handler Moves types that are only used for handling queries to the query_handler module. --- ingester/src/data.rs | 293 +---------------------------- ingester/src/data/partition.rs | 13 +- ingester/src/data/table.rs | 6 +- ingester/src/handler.rs | 4 +- ingester/src/querier_handler.rs | 301 +++++++++++++++++++++++++++++- ingester/src/server/grpc.rs | 5 +- query_tests/src/scenarios/util.rs | 7 +- 7 files changed, 309 insertions(+), 320 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 66f71159bb..d1ec7d39a2 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1,15 +1,12 @@ //! Data for the lifecycle of the Ingester -use std::{collections::BTreeMap, pin::Pin, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; -use arrow::{error::ArrowError, record_batch::RecordBatch}; -use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, ShardIndex, TableId}; -use datafusion::physical_plan::SendableRecordBatchStream; + use dml::DmlOperation; -use futures::{Stream, StreamExt}; use iox_catalog::interface::{get_table_schema_by_id, Catalog}; use iox_query::exec::Executor; use iox_time::SystemProvider; @@ -30,11 +27,7 @@ pub mod partition; pub(crate) mod shard; pub(crate) mod table; -use self::{ - partition::{resolver::PartitionProvider, PartitionStatus}, - shard::ShardData, - table::TableName, -}; +use self::{partition::resolver::PartitionProvider, shard::ShardData, table::TableName}; #[cfg(test)] mod triggers; @@ -482,172 +475,24 @@ impl Persister for IngesterData { } } -/// Stream of snapshots. -/// -/// Every snapshot is a dedicated [`SendableRecordBatchStream`]. -pub(crate) type SnapshotStream = - Pin> + Send>>; - -/// Response data for a single partition. -pub(crate) struct IngesterQueryPartition { - /// Stream of snapshots. - snapshots: SnapshotStream, - - /// Partition ID. - id: PartitionId, - - /// Partition persistence status. - status: PartitionStatus, -} - -impl std::fmt::Debug for IngesterQueryPartition { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("IngesterQueryPartition") - .field("snapshots", &"") - .field("id", &self.id) - .field("status", &self.status) - .finish() - } -} - -impl IngesterQueryPartition { - pub(crate) fn new(snapshots: SnapshotStream, id: PartitionId, status: PartitionStatus) -> Self { - Self { - snapshots, - id, - status, - } - } -} - -/// Stream of partitions in this response. -pub(crate) type IngesterQueryPartitionStream = - Pin> + Send>>; - -/// Response streams for querier<>ingester requests. -/// -/// The data structure is constructed to allow lazy/streaming data generation. For easier -/// consumption according to the wire protocol, use the [`flatten`](Self::flatten) method. -pub struct IngesterQueryResponse { - /// Stream of partitions. - partitions: IngesterQueryPartitionStream, -} - -impl std::fmt::Debug for IngesterQueryResponse { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("IngesterQueryResponse") - .field("partitions", &"") - .finish() - } -} - -impl IngesterQueryResponse { - /// Make a response - pub(crate) fn new(partitions: IngesterQueryPartitionStream) -> Self { - Self { partitions } - } - - /// Flattens the data according to the wire protocol. - pub fn flatten(self) -> FlatIngesterQueryResponseStream { - self.partitions - .flat_map(|partition_res| match partition_res { - Ok(partition) => { - let head = futures::stream::once(async move { - Ok(FlatIngesterQueryResponse::StartPartition { - partition_id: partition.id, - status: partition.status, - }) - }); - let tail = partition - .snapshots - .flat_map(|snapshot_res| match snapshot_res { - Ok(snapshot) => { - let schema = Arc::new(optimize_schema(&snapshot.schema())); - - let schema_captured = Arc::clone(&schema); - let head = futures::stream::once(async { - Ok(FlatIngesterQueryResponse::StartSnapshot { - schema: schema_captured, - }) - }); - - let tail = snapshot.map(move |batch_res| match batch_res { - Ok(batch) => Ok(FlatIngesterQueryResponse::RecordBatch { - batch: optimize_record_batch(&batch, Arc::clone(&schema))?, - }), - Err(e) => Err(e), - }); - - head.chain(tail).boxed() - } - Err(e) => futures::stream::once(async { Err(e) }).boxed(), - }); - - head.chain(tail).boxed() - } - Err(e) => futures::stream::once(async { Err(e) }).boxed(), - }) - .boxed() - } -} - -/// Flattened version of [`IngesterQueryResponse`]. -pub(crate) type FlatIngesterQueryResponseStream = - Pin> + Send>>; - -/// Element within the flat wire protocol. -#[derive(Debug, PartialEq)] -pub enum FlatIngesterQueryResponse { - /// Start a new partition. - StartPartition { - /// Partition ID. - partition_id: PartitionId, - - /// Partition persistence status. - status: PartitionStatus, - }, - - /// Start a new snapshot. - /// - /// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition) - /// message. - StartSnapshot { - /// Snapshot schema. - schema: Arc, - }, - - /// Add a record batch to the snapshot that was announced by the last - /// [`StartSnapshot`](Self::StartSnapshot) message. - RecordBatch { - /// Record batch. - batch: RecordBatch, - }, -} - #[cfg(test)] mod tests { - use std::{ - ops::DerefMut, - sync::Arc, - task::{Context, Poll}, - time::Duration, - }; + use std::{ops::DerefMut, sync::Arc, time::Duration}; - use arrow::datatypes::SchemaRef; use assert_matches::assert_matches; use data_types::{ ColumnId, ColumnSet, CompactionLevel, DeletePredicate, NamespaceSchema, NonEmptyString, ParquetFileParams, Sequence, Timestamp, TimestampRange, }; - use datafusion::physical_plan::RecordBatchStream; + use dml::{DmlDelete, DmlMeta, DmlWrite}; use futures::TryStreamExt; use iox_catalog::{mem::MemCatalog, validate_or_insert_schema}; use iox_time::Time; use metric::{MetricObserver, Observation}; - use mutable_batch_lp::{lines_to_batches, test_helpers::lp_to_mutable_batch}; + use mutable_batch_lp::lines_to_batches; use object_store::memory::InMemory; - use schema::selection::Selection; + use uuid::Uuid; use super::*; @@ -1506,128 +1351,4 @@ mod tests { assert_eq!(progresses, expected_progresses); } - - #[tokio::test] - async fn test_ingester_query_response_flatten() { - let batch_1_1 = lp_to_batch("table x=1 0"); - let batch_1_2 = lp_to_batch("table x=2 1"); - let batch_2 = lp_to_batch("table y=1 10"); - let batch_3 = lp_to_batch("table z=1 10"); - - let schema_1 = batch_1_1.schema(); - let schema_2 = batch_2.schema(); - let schema_3 = batch_3.schema(); - - let response = IngesterQueryResponse::new(Box::pin(futures::stream::iter([ - Ok(IngesterQueryPartition::new( - Box::pin(futures::stream::iter([ - Ok(Box::pin(TestRecordBatchStream::new( - vec![ - Ok(batch_1_1.clone()), - Err(ArrowError::NotYetImplemented("not yet implemeneted".into())), - Ok(batch_1_2.clone()), - ], - Arc::clone(&schema_1), - )) as _), - Err(ArrowError::InvalidArgumentError("invalid arg".into())), - Ok(Box::pin(TestRecordBatchStream::new( - vec![Ok(batch_2.clone())], - Arc::clone(&schema_2), - )) as _), - Ok(Box::pin(TestRecordBatchStream::new(vec![], Arc::clone(&schema_3))) as _), - ])), - PartitionId::new(2), - PartitionStatus { - parquet_max_sequence_number: None, - }, - )), - Err(ArrowError::IoError("some io error".into())), - Ok(IngesterQueryPartition::new( - Box::pin(futures::stream::iter([])), - PartitionId::new(1), - PartitionStatus { - parquet_max_sequence_number: None, - }, - )), - ]))); - - let actual: Vec<_> = response.flatten().collect().await; - let expected = vec![ - Ok(FlatIngesterQueryResponse::StartPartition { - partition_id: PartitionId::new(2), - status: PartitionStatus { - parquet_max_sequence_number: None, - }, - }), - Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_1 }), - Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_1 }), - Err(ArrowError::NotYetImplemented("not yet implemeneted".into())), - Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_2 }), - Err(ArrowError::InvalidArgumentError("invalid arg".into())), - Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_2 }), - Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_2 }), - Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_3 }), - Err(ArrowError::IoError("some io error".into())), - Ok(FlatIngesterQueryResponse::StartPartition { - partition_id: PartitionId::new(1), - status: PartitionStatus { - parquet_max_sequence_number: None, - }, - }), - ]; - - assert_eq!(actual.len(), expected.len()); - for (actual, expected) in actual.into_iter().zip(expected) { - match (actual, expected) { - (Ok(actual), Ok(expected)) => { - assert_eq!(actual, expected); - } - (Err(_), Err(_)) => { - // cannot compare `ArrowError`, but it's unlikely that someone changed the error - } - (Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"), - (Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"), - } - } - } - - fn lp_to_batch(lp: &str) -> RecordBatch { - lp_to_mutable_batch(lp).1.to_arrow(Selection::All).unwrap() - } - - pub struct TestRecordBatchStream { - schema: SchemaRef, - batches: Vec>, - } - - impl TestRecordBatchStream { - pub fn new(batches: Vec>, schema: SchemaRef) -> Self { - Self { schema, batches } - } - } - - impl RecordBatchStream for TestRecordBatchStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - } - - impl futures::Stream for TestRecordBatchStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - if self.batches.is_empty() { - Poll::Ready(None) - } else { - Poll::Ready(Some(self.batches.remove(0))) - } - } - - fn size_hint(&self) -> (usize, Option) { - (self.batches.len(), Some(self.batches.len())) - } - } } diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index 2adfa2582c..6e31899d82 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -14,7 +14,7 @@ use self::{ buffer::{BufferBatch, DataBuffer}, resolver::DeferredSortKey, }; -use crate::query::QueryableBatch; +use crate::{querier_handler::PartitionStatus, query::QueryableBatch}; use super::table::TableName; @@ -30,17 +30,6 @@ pub(crate) struct UnpersistedPartitionData { pub(crate) partition_status: PartitionStatus, } -/// Status of a partition that has unpersisted data. -/// -/// Note that this structure is specific to a partition (which itself is bound to a table and -/// shard)! -#[derive(Debug, Clone, PartialEq, Eq)] -#[allow(missing_copy_implementations)] -pub struct PartitionStatus { - /// Max sequence number persisted - pub parquet_max_sequence_number: Option, -} - /// PersistingBatch contains all needed info and data for creating /// a parquet file for given set of SnapshotBatches #[derive(Debug, PartialEq, Clone)] diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 357c3edd6c..709055dd88 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -6,10 +6,8 @@ use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId use mutable_batch::MutableBatch; use write_summary::ShardProgress; -use super::partition::{ - resolver::PartitionProvider, PartitionData, PartitionStatus, UnpersistedPartitionData, -}; -use crate::lifecycle::LifecycleHandle; +use super::partition::{resolver::PartitionProvider, PartitionData, UnpersistedPartitionData}; +use crate::{lifecycle::LifecycleHandle, querier_handler::PartitionStatus}; /// A double-referenced map where [`PartitionData`] can be looked up by /// [`PartitionKey`], or ID. diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 67b34342dd..981a43cd57 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -30,11 +30,11 @@ use crate::{ data::{ partition::resolver::{CatalogPartitionResolver, PartitionCache, PartitionProvider}, shard::ShardData, - IngesterData, IngesterQueryResponse, + IngesterData, }, lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager}, poison::PoisonCabinet, - querier_handler::prepare_data_to_querier, + querier_handler::{prepare_data_to_querier, IngesterQueryResponse}, stream_handler::{ handler::SequencedStreamHandler, sink_adaptor::IngestSinkAdaptor, sink_instrumentation::SinkInstrumentation, PeriodicWatermarkFetcher, diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index 88371e2c40..59996f94cf 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -1,10 +1,13 @@ //! Handle all requests from Querier -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; +use arrow::{error::ArrowError, record_batch::RecordBatch}; +use arrow_util::optimize::{optimize_record_batch, optimize_schema}; +use data_types::{PartitionId, SequenceNumber}; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::MemoryStream; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use generated_types::ingester::IngesterQueryRequest; use observability_deps::tracing::debug; use schema::selection::Selection; @@ -13,7 +16,7 @@ use snafu::{ensure, Snafu}; use crate::{ data::{ namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName, - IngesterData, IngesterQueryPartition, IngesterQueryResponse, + IngesterData, }, query::QueryableBatch, }; @@ -47,6 +50,159 @@ pub enum Error { /// A specialized `Error` for Ingester's Query errors pub type Result = std::result::Result; +/// Stream of snapshots. +/// +/// Every snapshot is a dedicated [`SendableRecordBatchStream`]. +pub(crate) type SnapshotStream = + Pin> + Send>>; + +/// Status of a partition that has unpersisted data. +/// +/// Note that this structure is specific to a partition (which itself is bound to a table and +/// shard)! +#[derive(Debug, Clone, PartialEq, Eq)] +#[allow(missing_copy_implementations)] +pub struct PartitionStatus { + /// Max sequence number persisted + pub parquet_max_sequence_number: Option, +} + +/// Response data for a single partition. +pub(crate) struct IngesterQueryPartition { + /// Stream of snapshots. + snapshots: SnapshotStream, + + /// Partition ID. + id: PartitionId, + + /// Partition persistence status. + status: PartitionStatus, +} + +impl std::fmt::Debug for IngesterQueryPartition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IngesterQueryPartition") + .field("snapshots", &"") + .field("id", &self.id) + .field("status", &self.status) + .finish() + } +} + +impl IngesterQueryPartition { + pub(crate) fn new(snapshots: SnapshotStream, id: PartitionId, status: PartitionStatus) -> Self { + Self { + snapshots, + id, + status, + } + } +} + +/// Stream of partitions in this response. +pub(crate) type IngesterQueryPartitionStream = + Pin> + Send>>; + +/// Response streams for querier<>ingester requests. +/// +/// The data structure is constructed to allow lazy/streaming data generation. For easier +/// consumption according to the wire protocol, use the [`flatten`](Self::flatten) method. +pub struct IngesterQueryResponse { + /// Stream of partitions. + partitions: IngesterQueryPartitionStream, +} + +impl std::fmt::Debug for IngesterQueryResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IngesterQueryResponse") + .field("partitions", &"") + .finish() + } +} + +impl IngesterQueryResponse { + /// Make a response + pub(crate) fn new(partitions: IngesterQueryPartitionStream) -> Self { + Self { partitions } + } + + /// Flattens the data according to the wire protocol. + pub fn flatten(self) -> FlatIngesterQueryResponseStream { + self.partitions + .flat_map(|partition_res| match partition_res { + Ok(partition) => { + let head = futures::stream::once(async move { + Ok(FlatIngesterQueryResponse::StartPartition { + partition_id: partition.id, + status: partition.status, + }) + }); + let tail = partition + .snapshots + .flat_map(|snapshot_res| match snapshot_res { + Ok(snapshot) => { + let schema = Arc::new(optimize_schema(&snapshot.schema())); + + let schema_captured = Arc::clone(&schema); + let head = futures::stream::once(async { + Ok(FlatIngesterQueryResponse::StartSnapshot { + schema: schema_captured, + }) + }); + + let tail = snapshot.map(move |batch_res| match batch_res { + Ok(batch) => Ok(FlatIngesterQueryResponse::RecordBatch { + batch: optimize_record_batch(&batch, Arc::clone(&schema))?, + }), + Err(e) => Err(e), + }); + + head.chain(tail).boxed() + } + Err(e) => futures::stream::once(async { Err(e) }).boxed(), + }); + + head.chain(tail).boxed() + } + Err(e) => futures::stream::once(async { Err(e) }).boxed(), + }) + .boxed() + } +} + +/// Flattened version of [`IngesterQueryResponse`]. +pub(crate) type FlatIngesterQueryResponseStream = + Pin> + Send>>; + +/// Element within the flat wire protocol. +#[derive(Debug, PartialEq)] +pub enum FlatIngesterQueryResponse { + /// Start a new partition. + StartPartition { + /// Partition ID. + partition_id: PartitionId, + + /// Partition persistence status. + status: PartitionStatus, + }, + + /// Start a new snapshot. + /// + /// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition) + /// message. + StartSnapshot { + /// Snapshot schema. + schema: Arc, + }, + + /// Add a record batch to the snapshot that was announced by the last + /// [`StartSnapshot`](Self::StartSnapshot) message. + RecordBatch { + /// Record batch. + batch: RecordBatch, + }, +} + /// Return data to send as a response back to the Querier per its request pub async fn prepare_data_to_querier( ingest_data: &Arc, @@ -189,19 +345,106 @@ fn prepare_data_to_querier_for_partition( #[cfg(test)] mod tests { - use arrow::{array::new_null_array, record_batch::RecordBatch}; + use std::task::{Context, Poll}; + + use arrow::{array::new_null_array, datatypes::SchemaRef, record_batch::RecordBatch}; use arrow_util::assert_batches_sorted_eq; use assert_matches::assert_matches; - use datafusion::logical_plan::{col, lit}; + use datafusion::{ + logical_plan::{col, lit}, + physical_plan::RecordBatchStream, + }; use futures::TryStreamExt; + use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use predicate::Predicate; use schema::merge::SchemaMerger; use super::*; - use crate::{ - data::FlatIngesterQueryResponse, - test_util::{make_ingester_data, DataLocation, TEST_NAMESPACE, TEST_TABLE}, - }; + use crate::test_util::{make_ingester_data, DataLocation, TEST_NAMESPACE, TEST_TABLE}; + + #[tokio::test] + async fn test_ingester_query_response_flatten() { + let batch_1_1 = lp_to_batch("table x=1 0"); + let batch_1_2 = lp_to_batch("table x=2 1"); + let batch_2 = lp_to_batch("table y=1 10"); + let batch_3 = lp_to_batch("table z=1 10"); + + let schema_1 = batch_1_1.schema(); + let schema_2 = batch_2.schema(); + let schema_3 = batch_3.schema(); + + let response = IngesterQueryResponse::new(Box::pin(futures::stream::iter([ + Ok(IngesterQueryPartition::new( + Box::pin(futures::stream::iter([ + Ok(Box::pin(TestRecordBatchStream::new( + vec![ + Ok(batch_1_1.clone()), + Err(ArrowError::NotYetImplemented("not yet implemeneted".into())), + Ok(batch_1_2.clone()), + ], + Arc::clone(&schema_1), + )) as _), + Err(ArrowError::InvalidArgumentError("invalid arg".into())), + Ok(Box::pin(TestRecordBatchStream::new( + vec![Ok(batch_2.clone())], + Arc::clone(&schema_2), + )) as _), + Ok(Box::pin(TestRecordBatchStream::new(vec![], Arc::clone(&schema_3))) as _), + ])), + PartitionId::new(2), + PartitionStatus { + parquet_max_sequence_number: None, + }, + )), + Err(ArrowError::IoError("some io error".into())), + Ok(IngesterQueryPartition::new( + Box::pin(futures::stream::iter([])), + PartitionId::new(1), + PartitionStatus { + parquet_max_sequence_number: None, + }, + )), + ]))); + + let actual: Vec<_> = response.flatten().collect().await; + let expected = vec![ + Ok(FlatIngesterQueryResponse::StartPartition { + partition_id: PartitionId::new(2), + status: PartitionStatus { + parquet_max_sequence_number: None, + }, + }), + Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_1 }), + Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_1 }), + Err(ArrowError::NotYetImplemented("not yet implemeneted".into())), + Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_2 }), + Err(ArrowError::InvalidArgumentError("invalid arg".into())), + Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_2 }), + Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_2 }), + Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_3 }), + Err(ArrowError::IoError("some io error".into())), + Ok(FlatIngesterQueryResponse::StartPartition { + partition_id: PartitionId::new(1), + status: PartitionStatus { + parquet_max_sequence_number: None, + }, + }), + ]; + + assert_eq!(actual.len(), expected.len()); + for (actual, expected) in actual.into_iter().zip(expected) { + match (actual, expected) { + (Ok(actual), Ok(expected)) => { + assert_eq!(actual, expected); + } + (Err(_), Err(_)) => { + // cannot compare `ArrowError`, but it's unlikely that someone changed the error + } + (Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"), + (Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"), + } + } + } #[tokio::test] async fn test_prepare_data_to_querier() { @@ -358,6 +601,46 @@ mod tests { } } + pub struct TestRecordBatchStream { + schema: SchemaRef, + batches: Vec>, + } + + impl TestRecordBatchStream { + pub fn new(batches: Vec>, schema: SchemaRef) -> Self { + Self { schema, batches } + } + } + + impl RecordBatchStream for TestRecordBatchStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + } + + impl futures::Stream for TestRecordBatchStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + if self.batches.is_empty() { + Poll::Ready(None) + } else { + Poll::Ready(Some(self.batches.remove(0))) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.batches.len(), Some(self.batches.len())) + } + } + + fn lp_to_batch(lp: &str) -> RecordBatch { + lp_to_mutable_batch(lp).1.to_arrow(Selection::All).unwrap() + } + /// Convert [`IngesterQueryResponse`] to a set of [`RecordBatch`]es. /// /// If the response contains multiple snapshots, this will merge the schemas into a single one and create diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index 8cbd26afe1..3bf785843d 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -30,8 +30,8 @@ use trace::ctx::SpanContext; use write_summary::WriteSummary; use crate::{ - data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream}, handler::IngestHandler, + querier_handler::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream}, }; /// This type is responsible for managing all gRPC services exposed by @@ -464,8 +464,9 @@ mod tests { use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use schema::selection::Selection; + use crate::querier_handler::PartitionStatus; + use super::*; - use crate::data::partition::PartitionStatus; #[tokio::test] async fn test_get_stream_empty() { diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 293280c066..477503504b 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -14,12 +14,9 @@ use generated_types::{ }; use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError}; use ingester::{ - data::{ - partition::resolver::CatalogPartitionResolver, FlatIngesterQueryResponse, IngesterData, - IngesterQueryResponse, Persister, - }, + data::{partition::resolver::CatalogPartitionResolver, IngesterData, Persister}, lifecycle::mock_handle::MockLifecycleHandle, - querier_handler::prepare_data_to_querier, + querier_handler::{prepare_data_to_querier, FlatIngesterQueryResponse, IngesterQueryResponse}, }; use iox_catalog::interface::get_schema_by_name; use iox_query::exec::{Executor, ExecutorConfig};