chore: refactor ingester to use upstream arrow-flight (#6622)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-01-19 16:16:13 +01:00 committed by GitHub
parent 2920db4100
commit f639bf3e23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 130 additions and 539 deletions

4
Cargo.lock generated
View File

@ -2502,6 +2502,7 @@ name = "influxdb_iox_client"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"arrow_util",
"bytes",
"client_util",
@ -2627,6 +2628,7 @@ name = "ingester"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"arrow_util",
"assert_matches",
"async-trait",
@ -2642,7 +2644,6 @@ dependencies = [
"generated_types",
"hashbrown 0.13.2",
"hyper",
"iox_arrow_flight",
"iox_catalog",
"iox_query",
"iox_time",
@ -4558,6 +4559,7 @@ name = "query_tests"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"arrow_util",
"async-trait",
"backoff",

View File

@ -146,4 +146,4 @@ incremental = true
opt-level = 3
[profile.dev.package.similar]
opt-level = 3
opt-level = 3

View File

@ -7,11 +7,12 @@ license.workspace = true
[features]
default = ["flight", "format"]
flight = ["arrow", "iox_arrow_flight", "arrow_util", "futures-util"]
flight = ["arrow", "iox_arrow_flight", "arrow-flight", "arrow_util", "futures-util"]
format = ["arrow", "arrow_util"]
[dependencies]
arrow = { workspace = true, optional = true }
arrow-flight = { workspace = true, optional = true }
arrow_util = { path = "../arrow_util", optional = true }
bytes = "1.3"
client_util = { path = "../client_util" }

View File

@ -38,10 +38,17 @@ pub enum Error {
#[error(transparent)]
ArrowError(#[from] arrow::error::ArrowError),
/// An error involving an Arrow operation occurred.
/// An error involving an Arrow Flight operation occurred.
#[error(transparent)]
ArrowFlightError(#[from] iox_arrow_flight::FlightError),
/// An error involving an Arrow Flight operation occurred
/// (exists alongsize ArrowFlightError until
/// <https://github.com/influxdata/influxdb_iox/issues/6620>
/// is complete)
#[error(transparent)]
ApacheArrowFlightError(#[from] arrow_flight::error::FlightError),
/// The data contained invalid Flatbuffers.
#[error("Invalid Flatbuffer: `{0}`")]
InvalidFlatbuffer(String),

View File

@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
arrow = { workspace = true, features = ["prettyprint"] }
arrow-flight = { workspace = true }
arrow_util = { path = "../arrow_util" }
async-trait = "0.1.61"
backoff = { path = "../backoff" }
@ -21,7 +22,6 @@ futures = "0.3"
generated_types = { path = "../generated_types" }
hashbrown = { workspace = true }
hyper = "0.14"
iox_arrow_flight = { path = "../iox_arrow_flight" }
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }
iox_time = { path = "../iox_time" }

View File

@ -3,16 +3,19 @@
use std::{pin::Pin, sync::Arc};
use arrow::{error::ArrowError, record_batch::RecordBatch};
use arrow_flight::{
decode::{DecodedPayload, FlightDataDecoder},
encode::FlightDataEncoderBuilder,
error::FlightError,
FlightData, IpcMessage,
};
use arrow_util::test_util::equalize_batch_schemas;
use data_types::{NamespaceId, PartitionId, SequenceNumber, TableId};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
use futures::{Stream, StreamExt, TryStreamExt};
use flatbuffers::FlatBufferBuilder;
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use generated_types::ingester::IngesterQueryRequest;
use iox_arrow_flight::encode::{
prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response,
GRPC_TARGET_MAX_BATCH_SIZE_BYTES,
};
use observability_deps::tracing::*;
use schema::Projection;
use snafu::{ensure, Snafu};
@ -116,75 +119,79 @@ impl std::fmt::Debug for IngesterQueryResponse {
}
}
fn encode_partition(
// Partition ID.
partition_id: PartitionId,
// Partition persistence status.
status: PartitionStatus,
) -> std::result::Result<FlightData, FlightError> {
use generated_types::influxdata::iox::ingester::v1 as proto;
let mut bytes = bytes::BytesMut::new();
let app_metadata = proto::IngesterQueryResponseMetadata {
partition_id: partition_id.get(),
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: status.parquet_max_sequence_number.map(|x| x.get()),
}),
// These fields are only used in ingester2.
ingester_uuid: String::new(),
completed_persistence_count: 0,
};
prost::Message::encode(&app_metadata, &mut bytes)
.map_err(|e| FlightError::from_external_error(Box::new(e)))?;
Ok(FlightData::new(
None,
IpcMessage(build_none_flight_msg().into()),
bytes.to_vec(),
vec![],
))
}
fn build_none_flight_msg() -> Vec<u8> {
let mut fbb = FlatBufferBuilder::new();
let mut message = arrow::ipc::MessageBuilder::new(&mut fbb);
message.add_version(arrow::ipc::MetadataVersion::V5);
message.add_header_type(arrow::ipc::MessageHeader::NONE);
message.add_bodyLength(0);
let data = message.finish();
fbb.finish(data, None);
fbb.finished_data().to_vec()
}
impl IngesterQueryResponse {
/// Make a response
pub(crate) fn new(partitions: IngesterQueryPartitionStream) -> Self {
Self { partitions }
}
/// Flattens the data according to the wire protocol.
pub fn flatten(self) -> FlatIngesterQueryResponseStream {
/// Flattens the stream of IngesterPartitions into a stream of FlightData
pub fn flatten(self) -> BoxStream<'static, std::result::Result<FlightData, FlightError>> {
self.partitions
.flat_map(|partition_res| match partition_res {
Ok(partition) => {
let head = futures::stream::once(async move {
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: partition.id,
status: partition.status,
})
encode_partition(partition.id, partition.status)
});
let tail = partition
.snapshots
.flat_map(|snapshot_res| match snapshot_res {
Ok(snapshot) => {
let schema =
Arc::new(prepare_schema_for_flight(&snapshot.schema()));
let snapshot = snapshot.map_err(FlightError::Arrow);
let schema_captured = Arc::clone(&schema);
let head = futures::stream::once(async {
Ok(FlatIngesterQueryResponse::StartSnapshot {
schema: schema_captured,
})
});
let tail = snapshot.flat_map(move |batch_res| match batch_res {
Ok(batch) => {
let batch =
prepare_batch_for_flight(&batch, Arc::clone(&schema))
.map_err(|e| {
ArrowError::ExternalError(Box::new(e))
});
match batch {
Ok(batch) => {
let batches = split_batch_for_grpc_response(
batch,
GRPC_TARGET_MAX_BATCH_SIZE_BYTES,
);
futures::stream::iter(batches)
.map(|batch| {
Ok(FlatIngesterQueryResponse::RecordBatch {
batch,
})
})
.boxed()
}
Err(e) => {
futures::stream::once(async { Err(e) }).boxed()
}
}
}
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
});
head.chain(tail).boxed()
FlightDataEncoderBuilder::new().build(snapshot).boxed()
}
Err(e) => {
futures::stream::once(async { Err(FlightError::Arrow(e)) }).boxed()
}
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
});
head.chain(tail).boxed()
}
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
Err(e) => futures::stream::once(async { Err(FlightError::Arrow(e)) }).boxed(),
})
.boxed()
}
@ -202,17 +209,18 @@ impl IngesterQueryResponse {
let mut snapshot_schema = None;
let mut batches = vec![];
let mut stream = self.flatten();
let mut stream = FlightDataDecoder::new(self.flatten());
while let Some(msg) = stream.try_next().await.unwrap() {
match msg {
FlatIngesterQueryResponse::StartPartition { .. } => (),
FlatIngesterQueryResponse::RecordBatch { batch } => {
match msg.payload {
DecodedPayload::None => {}
DecodedPayload::RecordBatch(batch) => {
let last_schema = snapshot_schema.as_ref().unwrap();
assert_eq!(&batch.schema(), last_schema);
batches.push(batch);
}
FlatIngesterQueryResponse::StartSnapshot { schema } => {
snapshot_schema = Some(Arc::clone(&schema));
DecodedPayload::Schema(schema) => {
snapshot_schema = Some(schema);
}
}
}
@ -223,10 +231,6 @@ impl IngesterQueryResponse {
}
}
/// Flattened version of [`IngesterQueryResponse`].
pub type FlatIngesterQueryResponseStream =
Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>;
/// Element within the flat wire protocol.
#[derive(Debug, PartialEq)]
pub enum FlatIngesterQueryResponse {
@ -385,105 +389,15 @@ pub async fn prepare_data_to_querier(
#[cfg(test)]
mod tests {
use std::task::{Context, Poll};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use arrow_util::assert_batches_sorted_eq;
use assert_matches::assert_matches;
use datafusion::{
physical_plan::RecordBatchStream,
prelude::{col, lit},
};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use datafusion::prelude::{col, lit};
use predicate::Predicate;
use super::*;
use crate::test_util::make_ingester_data;
#[tokio::test]
async fn test_ingester_query_response_flatten() {
let batch_1_1 = lp_to_batch("table x=1 0");
let batch_1_2 = lp_to_batch("table x=2 1");
let batch_2 = lp_to_batch("table y=1 10");
let batch_3 = lp_to_batch("table z=1 10");
let schema_1 = batch_1_1.schema();
let schema_2 = batch_2.schema();
let schema_3 = batch_3.schema();
let response = IngesterQueryResponse::new(Box::pin(futures::stream::iter([
Ok(IngesterQueryPartition::new(
Box::pin(futures::stream::iter([
Ok(Box::pin(TestRecordBatchStream::new(
vec![
Ok(batch_1_1.clone()),
Err(ArrowError::NotYetImplemented("not yet implemeneted".into())),
Ok(batch_1_2.clone()),
],
Arc::clone(&schema_1),
)) as _),
Err(ArrowError::InvalidArgumentError("invalid arg".into())),
Ok(Box::pin(TestRecordBatchStream::new(
vec![Ok(batch_2.clone())],
Arc::clone(&schema_2),
)) as _),
Ok(Box::pin(TestRecordBatchStream::new(vec![], Arc::clone(&schema_3))) as _),
])),
PartitionId::new(2),
PartitionStatus {
parquet_max_sequence_number: None,
},
)),
Err(ArrowError::IoError("some io error".into())),
Ok(IngesterQueryPartition::new(
Box::pin(futures::stream::iter([])),
PartitionId::new(1),
PartitionStatus {
parquet_max_sequence_number: None,
},
)),
])));
let actual: Vec<_> = response.flatten().collect().await;
let expected = vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(2),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_1 }),
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_1 }),
Err(ArrowError::NotYetImplemented("not yet implemeneted".into())),
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_2 }),
Err(ArrowError::InvalidArgumentError("invalid arg".into())),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_2 }),
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_2 }),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_3 }),
Err(ArrowError::IoError("some io error".into())),
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
];
assert_eq!(actual.len(), expected.len());
for (actual, expected) in actual.into_iter().zip(expected) {
match (actual, expected) {
(Ok(actual), Ok(expected)) => {
assert_eq!(actual, expected);
}
(Err(_), Err(_)) => {
// cannot compare `ArrowError`, but it's unlikely that someone changed the error
}
(Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"),
(Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"),
}
}
}
#[tokio::test]
async fn test_prepare_data_to_querier() {
test_helpers::maybe_start_logging();
@ -637,44 +551,4 @@ mod tests {
assert_matches!(err, Error::NamespaceNotFound { .. });
}
}
pub struct TestRecordBatchStream {
schema: SchemaRef,
batches: Vec<Result<RecordBatch, ArrowError>>,
}
impl TestRecordBatchStream {
pub fn new(batches: Vec<Result<RecordBatch, ArrowError>>, schema: SchemaRef) -> Self {
Self { schema, batches }
}
}
impl RecordBatchStream for TestRecordBatchStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
impl futures::Stream for TestRecordBatchStream {
type Item = Result<RecordBatch, ArrowError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.batches.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(self.batches.remove(0)))
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.batches.len(), Some(self.batches.len()))
}
}
fn lp_to_batch(lp: &str) -> RecordBatch {
lp_to_mutable_batch(lp).1.to_arrow(Projection::All).unwrap()
}
}

View File

@ -6,13 +6,13 @@ mod write_info;
use std::sync::{atomic::AtomicU64, Arc};
use arrow_flight::flight_service_server::{
FlightService as Flight, FlightServiceServer as FlightServer,
};
use generated_types::influxdata::iox::{
catalog::v1::*,
ingester::v1::write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
};
use iox_arrow_flight::flight_service_server::{
FlightService as Flight, FlightServiceServer as FlightServer,
};
use iox_catalog::interface::Catalog;
use service_grpc_catalog::CatalogService;

View File

@ -4,30 +4,24 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
};
use arrow::error::ArrowError;
use data_types::{NamespaceId, TableId};
use flatbuffers::FlatBufferBuilder;
use futures::Stream;
use generated_types::influxdata::iox::ingester::v1::{self as proto};
use iox_arrow_flight::{
encode::flight_data_from_arrow_batch, flight_service_server::FlightService as Flight, Action,
ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
HandshakeResponse, IpcMessage, PutResult, SchemaAsIpc, SchemaResult, Ticket,
use arrow_flight::{
flight_service_server::FlightService as Flight, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult,
SchemaResult, Ticket,
};
use data_types::{NamespaceId, TableId};
use futures::{Stream, TryStreamExt};
use generated_types::influxdata::iox::ingester::v1::{self as proto};
use observability_deps::tracing::*;
use pin_project::pin_project;
use prost::Message;
use snafu::{ResultExt, Snafu};
use tonic::{Request, Response, Streaming};
use trace::{ctx::SpanContext, span::SpanExt};
use crate::{
handler::IngestHandler,
querier_handler::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
};
use crate::handler::IngestHandler;
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
@ -157,7 +151,7 @@ impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
type ListFlightsStream = TonicStream<FlightInfo>;
type DoGetStream = TonicStream<FlightData>;
type DoPutStream = TonicStream<PutResult>;
type DoActionStream = TonicStream<iox_arrow_flight::Result>;
type DoActionStream = TonicStream<arrow_flight::Result>;
type ListActionsStream = TonicStream<ActionType>;
type DoExchangeStream = TonicStream<FlightData>;
@ -204,8 +198,10 @@ impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
},
})?;
let output = GetStream::new(query_response.flatten());
let output = query_response
.flatten()
// map FlightError --> tonic::Status
.map_err(|e| e.into());
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
@ -264,271 +260,3 @@ impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}
#[pin_project]
struct GetStream {
#[pin]
inner: Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>,
done: bool,
buffer: Vec<FlightData>,
}
impl GetStream {
fn new(inner: FlatIngesterQueryResponseStream) -> Self {
Self {
inner,
done: false,
buffer: vec![],
}
}
}
impl Stream for GetStream {
type Item = Result<FlightData, tonic::Status>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
if !this.buffer.is_empty() {
let next = this.buffer.remove(0);
return Poll::Ready(Some(Ok(next)));
}
if *this.done {
Poll::Ready(None)
} else {
match this.inner.poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
*this.done = true;
Poll::Ready(None)
}
Poll::Ready(Some(Err(e))) => {
*this.done = true;
let e = Error::QueryStream { source: e }.into();
Poll::Ready(Some(Err(e)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::StartPartition {
partition_id,
status,
}))) => {
let mut bytes = bytes::BytesMut::new();
let app_metadata = proto::IngesterQueryResponseMetadata {
partition_id: partition_id.get(),
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: status
.parquet_max_sequence_number
.map(|x| x.get()),
}),
// These fields are only used in ingester2.
ingester_uuid: String::new(),
completed_persistence_count: 0,
};
prost::Message::encode(&app_metadata, &mut bytes)
.context(SerializationSnafu)?;
let flight_data = FlightData::new(
None,
IpcMessage(build_none_flight_msg().into()),
bytes.to_vec(),
vec![],
);
Poll::Ready(Some(Ok(flight_data)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::StartSnapshot { schema }))) => {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into();
Poll::Ready(Some(Ok(flight_data)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::RecordBatch { batch }))) => {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let (mut flight_dictionaries, flight_batch) =
flight_data_from_arrow_batch(&batch, &options);
std::mem::swap(this.buffer, &mut flight_dictionaries);
this.buffer.push(flight_batch);
let next = this.buffer.remove(0);
Poll::Ready(Some(Ok(next)))
}
}
}
}
}
fn build_none_flight_msg() -> Vec<u8> {
let mut fbb = FlatBufferBuilder::new();
let mut message = arrow::ipc::MessageBuilder::new(&mut fbb);
message.add_version(arrow::ipc::MetadataVersion::V5);
message.add_header_type(arrow::ipc::MessageHeader::NONE);
message.add_bodyLength(0);
let data = message.finish();
fbb.finish(data, None);
fbb.finished_data().to_vec()
}
#[cfg(test)]
mod tests {
use arrow::{error::ArrowError, ipc::MessageHeader};
use data_types::PartitionId;
use futures::StreamExt;
use generated_types::influxdata::iox::ingester::v1::{self as proto};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::Projection;
use super::*;
use crate::querier_handler::{FlatIngesterQueryResponse, PartitionStatus};
#[tokio::test]
async fn test_get_stream_empty() {
assert_get_stream(vec![], vec![]).await;
}
#[tokio::test]
async fn test_get_stream_all_types() {
let batch = lp_to_mutable_batch("table z=1 0")
.1
.to_arrow(Projection::All)
.unwrap();
let schema = batch.schema();
assert_get_stream(
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema }),
Ok(FlatIngesterQueryResponse::RecordBatch { batch }),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
}),
// These fields are only used in ingester2.
ingester_uuid: String::new(),
completed_persistence_count: 0,
},
}),
Ok(DecodedFlightData {
header_type: MessageHeader::Schema,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_shortcuts_err() {
assert_get_stream(
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
Err(ArrowError::IoError("foo".into())),
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
}),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
}),
// These fields are only used in ingester2.
ingester_uuid: String::new(),
completed_persistence_count: 0,
},
}),
Err(tonic::Code::Internal),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_dictionary_batches() {
let batch = lp_to_mutable_batch("table,x=\"foo\",y=\"bar\" z=1 0")
.1
.to_arrow(Projection::All)
.unwrap();
assert_get_stream(
vec![Ok(FlatIngesterQueryResponse::RecordBatch { batch })],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
struct DecodedFlightData {
header_type: MessageHeader,
app_metadata: proto::IngesterQueryResponseMetadata,
}
async fn assert_get_stream(
inputs: Vec<Result<FlatIngesterQueryResponse, ArrowError>>,
expected: Vec<Result<DecodedFlightData, tonic::Code>>,
) {
let inner = Box::pin(futures::stream::iter(inputs));
let stream = GetStream::new(inner);
let actual: Vec<_> = stream.collect().await;
assert_eq!(actual.len(), expected.len());
for (actual, expected) in actual.into_iter().zip(expected) {
match (actual, expected) {
(Ok(actual), Ok(expected)) => {
let header_type = arrow::ipc::root_as_message(&actual.data_header[..])
.unwrap()
.header_type();
assert_eq!(header_type, expected.header_type);
let app_metadata: proto::IngesterQueryResponseMetadata =
prost::Message::decode(&actual.app_metadata[..]).unwrap();
assert_eq!(app_metadata, expected.app_metadata);
}
(Err(actual), Err(expected)) => {
assert_eq!(actual.code(), expected);
}
(Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"),
(Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"),
}
}
}
}

View File

@ -8,6 +8,7 @@ license.workspace = true
[dependencies]
arrow = { workspace = true, features = ["prettyprint"] }
arrow-flight = { workspace = true }
arrow_util = { path = "../arrow_util" }
async-trait = "0.1"
backoff = { path = "../backoff" }

View File

@ -1,5 +1,6 @@
//! This module contains util functions for testing scenarios
use super::DbScenario;
use arrow_flight::decode::FlightDataDecoder;
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{
@ -9,14 +10,13 @@ use data_types::{
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::StreamExt;
use generated_types::{
influxdata::iox::ingester::v1::{IngesterQueryResponseMetadata, PartitionStatus},
ingester::IngesterQueryRequest,
influxdata::iox::ingester::v1::IngesterQueryResponseMetadata, ingester::IngesterQueryRequest,
};
use influxdb_iox_client::flight::Error as FlightError;
use ingester::{
data::{DmlApplyAction, IngesterData, Persister},
lifecycle::mock_handle::MockLifecycleHandle,
querier_handler::{prepare_data_to_querier, FlatIngesterQueryResponse, IngesterQueryResponse},
querier_handler::{prepare_data_to_querier, IngesterQueryResponse},
};
use iox_arrow_flight::DecodedPayload;
use iox_catalog::interface::get_schema_by_name;
@ -998,10 +998,7 @@ impl IngesterFlightClient for MockIngester {
/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a
/// [`IngesterFlightClientQueryData`] (used by the querier) without doing any real gRPC IO.
struct QueryDataAdapter {
messages: Box<
dyn Iterator<Item = Result<(DecodedPayload, IngesterQueryResponseMetadata), FlightError>>
+ Send,
>,
inner: FlightDataDecoder,
}
impl std::fmt::Debug for QueryDataAdapter {
@ -1012,52 +1009,10 @@ impl std::fmt::Debug for QueryDataAdapter {
impl QueryDataAdapter {
/// Create new adapter.
///
/// This pre-calculates some data structure that we are going to need later.
async fn new(response: IngesterQueryResponse) -> Self {
let mut messages = vec![];
let mut stream = response.flatten();
while let Some(msg_res) = stream.next().await {
match msg_res {
Ok(msg) => {
let (msg, md) = match msg {
FlatIngesterQueryResponse::StartPartition {
partition_id,
status,
} => (
DecodedPayload::None,
IngesterQueryResponseMetadata {
partition_id: partition_id.get(),
status: Some(PartitionStatus {
parquet_max_sequence_number: status
.parquet_max_sequence_number
.map(|x| x.get()),
}),
// These fields are only used in ingester2.
ingester_uuid: String::new(),
completed_persistence_count: 0,
},
),
FlatIngesterQueryResponse::StartSnapshot { schema } => (
DecodedPayload::Schema(schema),
IngesterQueryResponseMetadata::default(),
),
FlatIngesterQueryResponse::RecordBatch { batch } => (
DecodedPayload::RecordBatch(batch),
IngesterQueryResponseMetadata::default(),
),
};
messages.push(Ok((msg, md)));
}
Err(e) => {
messages.push(Err(FlightError::ArrowError(e)));
}
}
}
let inner = FlightDataDecoder::new(response.flatten());
Self {
messages: Box::new(messages.into_iter()),
}
Self { inner }
}
}
@ -1066,6 +1021,29 @@ impl IngesterFlightClientQueryData for QueryDataAdapter {
async fn next_message(
&mut self,
) -> Result<Option<(DecodedPayload, IngesterQueryResponseMetadata)>, FlightError> {
self.messages.next().transpose()
match self.inner.next().await {
None => Ok(None),
Some(Ok(data)) => {
let arrow_flight::decode::DecodedFlightData { inner, payload } = data;
// decode the app metadata
let app_metadata = &inner.app_metadata[..];
let app_metadata: IngesterQueryResponseMetadata =
iox_arrow_flight::prost::Message::decode(app_metadata).unwrap();
let payload = match payload {
// translate from apache crate to iox_arrow_flight versions
arrow_flight::decode::DecodedPayload::None => DecodedPayload::None,
arrow_flight::decode::DecodedPayload::RecordBatch(batch) => {
DecodedPayload::RecordBatch(batch)
}
arrow_flight::decode::DecodedPayload::Schema(schema) => {
DecodedPayload::Schema(schema)
}
};
let res = (payload, app_metadata);
Ok(Some(res))
}
Some(Err(e)) => Err(FlightError::ApacheArrowFlightError(e)),
}
}
}