test: test gRPC and stream flattening (#4873)

Ref #4849.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-06-16 13:44:59 +02:00 committed by GitHub
parent 73e1b3a47e
commit 4b945493be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 308 additions and 13 deletions

View File

@ -1523,7 +1523,7 @@ pub struct QueryableBatch {
///
/// Note that this structure is specific to a partition (which itself is bound to a table and
/// sequencer)!
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(missing_copy_implementations)]
pub struct PartitionStatus {
/// Max sequence number persisted
@ -1599,9 +1599,7 @@ impl IngesterQueryResponse {
}
/// Flattens the data according to the wire protocol.
pub fn flatten(
self,
) -> Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>> {
pub fn flatten(self) -> FlatIngesterQueryResponseStream {
self.partitions
.flat_map(|partition_res| match partition_res {
Ok(partition) => {
@ -1644,8 +1642,12 @@ impl IngesterQueryResponse {
}
}
/// Flattened version of [`IngesterQueryResponse`].
pub type FlatIngesterQueryResponseStream =
Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>;
/// Element within the flat wire protocol.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum FlatIngesterQueryResponse {
/// Start a new partition.
StartPartition {
@ -1679,11 +1681,13 @@ mod tests {
partioning::DefaultPartitioner,
test_util::create_tombstone,
};
use arrow::datatypes::SchemaRef;
use arrow_util::assert_batches_sorted_eq;
use assert_matches::assert_matches;
use data_types::{
NamespaceSchema, NonEmptyString, ParquetFileParams, Sequence, TimestampRange,
};
use datafusion::physical_plan::RecordBatchStream;
use dml::{DmlDelete, DmlMeta, DmlWrite};
use futures::TryStreamExt;
use iox_catalog::{
@ -1693,7 +1697,11 @@ mod tests {
use metric::{MetricObserver, Observation};
use mutable_batch_lp::{lines_to_batches, test_helpers::lp_to_mutable_batch};
use object_store::memory::InMemory;
use std::{ops::DerefMut, time::Duration};
use std::{
ops::DerefMut,
task::{Context, Poll},
time::Duration,
};
#[test]
fn snapshot_empty_buffer_adds_no_snapshots() {
@ -2740,4 +2748,132 @@ mod tests {
assert_eq!(progresses, expected_progresses);
}
#[tokio::test]
async fn test_ingester_query_response_flatten() {
let batch_1_1 = lp_to_batch("table x=1 0");
let batch_1_2 = lp_to_batch("table x=2 1");
let batch_2 = lp_to_batch("table y=1 10");
let batch_3 = lp_to_batch("table z=1 10");
let schema_1 = batch_1_1.schema();
let schema_2 = batch_2.schema();
let schema_3 = batch_3.schema();
let response = IngesterQueryResponse::new(Box::pin(futures::stream::iter([
Ok(IngesterQueryPartition::new(
Box::pin(futures::stream::iter([
Ok(Box::pin(TestRecordBatchStream::new(
vec![
Ok(batch_1_1.clone()),
Err(ArrowError::NotYetImplemented("not yet implemeneted".into())),
Ok(batch_1_2.clone()),
],
Arc::clone(&schema_1),
)) as _),
Err(ArrowError::InvalidArgumentError("invalid arg".into())),
Ok(Box::pin(TestRecordBatchStream::new(
vec![Ok(batch_2.clone())],
Arc::clone(&schema_2),
)) as _),
Ok(Box::pin(TestRecordBatchStream::new(vec![], Arc::clone(&schema_3))) as _),
])),
PartitionId::new(2),
PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: Some(SequenceNumber::new(1)),
},
)),
Err(ArrowError::IoError("some io error".into())),
Ok(IngesterQueryPartition::new(
Box::pin(futures::stream::iter([])),
PartitionId::new(1),
PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
},
)),
])));
let actual: Vec<_> = response.flatten().collect().await;
let expected = vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(2),
status: PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: Some(SequenceNumber::new(1)),
},
}),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_1 }),
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_1 }),
Err(ArrowError::NotYetImplemented("not yet implemeneted".into())),
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_2 }),
Err(ArrowError::InvalidArgumentError("invalid arg".into())),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_2 }),
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_2 }),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_3 }),
Err(ArrowError::IoError("some io error".into())),
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
},
}),
];
assert_eq!(actual.len(), expected.len());
for (actual, expected) in actual.into_iter().zip(expected) {
match (actual, expected) {
(Ok(actual), Ok(expected)) => {
assert_eq!(actual, expected);
}
(Err(_), Err(_)) => {
// cannot compare `ArrowError`, but it's unlikely that someone changed the error
}
(Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"),
(Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"),
}
}
}
fn lp_to_batch(lp: &str) -> RecordBatch {
lp_to_mutable_batch(lp).1.to_arrow(Selection::All).unwrap()
}
pub struct TestRecordBatchStream {
schema: SchemaRef,
batches: Vec<Result<RecordBatch, ArrowError>>,
}
impl TestRecordBatchStream {
pub fn new(batches: Vec<Result<RecordBatch, ArrowError>>, schema: SchemaRef) -> Self {
Self { schema, batches }
}
}
impl RecordBatchStream for TestRecordBatchStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
impl futures::Stream for TestRecordBatchStream {
type Item = Result<RecordBatch, ArrowError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.batches.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(self.batches.remove(0)))
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.batches.len(), Some(self.batches.len()))
}
}
}

View File

@ -1,7 +1,7 @@
//! gRPC service implementations for `ingester`.
use crate::{
data::{FlatIngesterQueryResponse, IngesterQueryResponse},
data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
handler::IngestHandler,
};
use arrow::error::ArrowError;
@ -290,7 +290,7 @@ impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
},
})?;
let output = GetStream::new(query_response).await?;
let output = GetStream::new(query_response.flatten());
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
@ -360,14 +360,12 @@ struct GetStream {
}
impl GetStream {
async fn new(query_response: IngesterQueryResponse) -> Result<Self, tonic::Status> {
let inner = query_response.flatten();
Ok(Self {
fn new(inner: FlatIngesterQueryResponseStream) -> Self {
Self {
inner,
done: false,
buffer: vec![],
})
}
}
}
@ -458,3 +456,164 @@ fn build_none_flight_msg() -> Vec<u8> {
fbb.finished_data().to_vec()
}
#[cfg(test)]
mod tests {
use arrow::ipc::MessageHeader;
use data_types::PartitionId;
use futures::StreamExt;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::selection::Selection;
use crate::data::PartitionStatus;
use super::*;
#[tokio::test]
async fn test_get_stream_empty() {
assert_get_stream(vec![], vec![]).await;
}
#[tokio::test]
async fn test_get_stream_all_types() {
let batch = lp_to_mutable_batch("table z=1 0")
.1
.to_arrow(Selection::All)
.unwrap();
let schema = batch.schema();
assert_get_stream(
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
},
}),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema }),
Ok(FlatIngesterQueryResponse::RecordBatch { batch }),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
}),
},
}),
Ok(DecodedFlightData {
header_type: MessageHeader::Schema,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_shortcuts_err() {
assert_get_stream(
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
},
}),
Err(ArrowError::IoError("foo".into())),
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
},
}),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
}),
},
}),
Err(tonic::Code::Internal),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_dictionary_batches() {
let batch = lp_to_mutable_batch("table,x=\"foo\",y=\"bar\" z=1 0")
.1
.to_arrow(Selection::All)
.unwrap();
assert_get_stream(
vec![Ok(FlatIngesterQueryResponse::RecordBatch { batch })],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
struct DecodedFlightData {
header_type: MessageHeader,
app_metadata: proto::IngesterQueryResponseMetadata,
}
async fn assert_get_stream(
inputs: Vec<Result<FlatIngesterQueryResponse, ArrowError>>,
expected: Vec<Result<DecodedFlightData, tonic::Code>>,
) {
let inner = Box::pin(futures::stream::iter(inputs));
let stream = GetStream::new(inner);
let actual: Vec<_> = stream.collect().await;
assert_eq!(actual.len(), expected.len());
for (actual, expected) in actual.into_iter().zip(expected) {
match (actual, expected) {
(Ok(actual), Ok(expected)) => {
let header_type = arrow::ipc::root_as_message(&actual.data_header[..])
.unwrap()
.header_type();
assert_eq!(header_type, expected.header_type);
let app_metadata: proto::IngesterQueryResponseMetadata =
prost::Message::decode(&actual.app_metadata[..]).unwrap();
assert_eq!(app_metadata, expected.app_metadata);
}
(Err(actual), Err(expected)) => {
assert_eq!(actual.code(), expected);
}
(Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"),
(Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"),
}
}
}
}