refactor: remove iox_arrow_flight use in ingester2 (#6623)

* refactor: remove iox_arrow_flight use in ingester2

* fix: Update ingester2/src/server/grpc/query.rs

Co-authored-by: Dom <dom@itsallbroken.com>

* chore: remove unused Error enums

Co-authored-by: Dom <dom@itsallbroken.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-01-19 16:27:23 +01:00 committed by GitHub
parent f639bf3e23
commit 5b6d261396
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 392 deletions

2
Cargo.lock generated
View File

@ -2684,6 +2684,7 @@ name = "ingester2"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"arrow_util",
"assert_matches",
"async-channel",
@ -2700,7 +2701,6 @@ dependencies = [
"futures",
"generated_types",
"hashbrown 0.13.2",
"iox_arrow_flight",
"iox_catalog",
"iox_query",
"iox_time",

View File

@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
arrow = { workspace = true, features = ["prettyprint"] }
arrow-flight = { workspace = true }
arrow_util = { version = "0.1.0", path = "../arrow_util" }
async-channel = "1.8.0"
async-trait = "0.1.61"
@ -21,7 +22,6 @@ flatbuffers = "22"
futures = "0.3.25"
generated_types = { version = "0.1.0", path = "../generated_types" }
hashbrown.workspace = true
iox_arrow_flight = { path = "../iox_arrow_flight" }
iox_catalog = { version = "0.1.0", path = "../iox_catalog" }
iox_query = { version = "0.1.0", path = "../iox_query" }
iox_time = { path = "../iox_time" }

View File

@ -7,6 +7,7 @@ mod wal_replay;
use std::{path::PathBuf, sync::Arc, time::Duration};
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use backoff::BackoffConfig;
use futures::{future::Shared, Future, FutureExt};
use generated_types::influxdata::iox::{
@ -16,7 +17,6 @@ use generated_types::influxdata::iox::{
write_service_server::{WriteService, WriteServiceServer},
},
};
use iox_arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use observability_deps::tracing::*;

View File

@ -6,13 +6,13 @@ mod rpc_write;
use std::{fmt::Debug, sync::Arc};
use arrow_flight::flight_service_server::FlightServiceServer;
use generated_types::influxdata::iox::{
catalog::v1::catalog_service_server::CatalogServiceServer,
ingester::v1::{
persist_service_server::PersistServiceServer, write_service_server::WriteServiceServer,
},
};
use iox_arrow_flight::flight_service_server::FlightServiceServer;
use iox_catalog::interface::Catalog;
use service_grpc_catalog::CatalogService;
@ -125,7 +125,7 @@ where
/// Return an Arrow [`FlightService`] gRPC implementation.
///
/// [`FlightService`]: iox_arrow_flight::flight_service_server::FlightService
/// [`FlightService`]: arrow_flight::flight_service_server::FlightService
fn query_service(
&self,
max_simultaneous_requests: usize,

View File

@ -1,22 +1,17 @@
use std::{pin::Pin, sync::Arc, task::Poll};
use std::pin::Pin;
use arrow::{error::ArrowError, record_batch::RecordBatch};
use arrow_flight::{
encode::FlightDataEncoderBuilder, error::FlightError,
flight_service_server::FlightService as Flight, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage,
PutResult, SchemaResult, Ticket,
};
use data_types::{NamespaceId, PartitionId, TableId};
use flatbuffers::FlatBufferBuilder;
use futures::{Stream, StreamExt};
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use generated_types::influxdata::iox::ingester::v1::{self as proto, PartitionStatus};
use iox_arrow_flight::{
encode::{
flight_data_from_arrow_batch, prepare_batch_for_flight, prepare_schema_for_flight,
split_batch_for_grpc_response, GRPC_TARGET_MAX_BATCH_SIZE_BYTES,
},
flight_service_server::FlightService as Flight,
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, IpcMessage, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use metric::U64Counter;
use observability_deps::tracing::*;
use pin_project::pin_project;
use prost::Message;
use thiserror::Error;
use tokio::sync::{Semaphore, TryAcquireError};
@ -44,16 +39,6 @@ enum Error {
#[error("invalid flight ticket: {0}")]
InvalidTicket(#[from] prost::DecodeError),
/// The [`proto::IngesterQueryResponseMetadata`] response metadata being
/// returned to the RPC caller cannot be serialised into the protobuf
/// response format.
#[error("failed to serialise response: {0}")]
SerialiseResponse(#[from] prost::EncodeError),
/// An error was observed in the [`QueryResponse`] stream.
#[error("error streaming response: {0}")]
Stream(#[from] ArrowError),
/// The number of simultaneous queries being executed has been reached.
#[error("simultaneous query limit exceeded")]
RequestLimit,
@ -82,10 +67,6 @@ impl From<Error> for tonic::Status {
debug!(error=%e, "invalid flight query ticket");
Code::InvalidArgument
}
Error::Stream(_) | Error::SerialiseResponse(_) => {
error!(error=%e, "flight query response error");
Code::Internal
}
Error::RequestLimit => {
warn!("simultaneous query limit exceeded");
Code::ResourceExhausted
@ -149,7 +130,7 @@ where
type ListFlightsStream = TonicStream<FlightInfo>;
type DoGetStream = TonicStream<FlightData>;
type DoPutStream = TonicStream<PutResult>;
type DoActionStream = TonicStream<iox_arrow_flight::Result>;
type DoActionStream = TonicStream<arrow_flight::Result>;
type ListActionsStream = TonicStream<ActionType>;
type DoExchangeStream = TonicStream<FlightData>;
@ -205,10 +186,7 @@ where
)
.await?;
let output = FlightFrameCodec::new(
FlatIngesterQueryResponseStream::from(response),
self.ingester_id,
);
let output = encode_response(response, self.ingester_id).map_err(tonic::Status::from);
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
@ -269,201 +247,38 @@ where
}
}
/// A stream of [`FlatIngesterQueryResponse`], itself a flattened version of
/// [`QueryResponse`].
type FlatIngesterQueryResponseStream =
Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>;
/// Element within the flat wire protocol.
#[derive(Debug, PartialEq)]
pub enum FlatIngesterQueryResponse {
/// Start a new partition.
StartPartition {
/// Partition ID.
partition_id: PartitionId,
/// Partition persistence status.
status: PartitionStatus,
/// Count of persisted Parquet files for the [`PartitionData`] instance this
/// [`PartitionResponse`] was generated from.
///
/// [`PartitionData`]: crate::buffer_tree::partition::PartitionData
/// [`PartitionResponse`]: crate::query::partition_response::PartitionResponse
completed_persistence_count: u64,
},
/// Start a new snapshot.
///
/// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition)
/// message.
StartSnapshot {
/// Snapshot schema.
schema: Arc<arrow::datatypes::Schema>,
},
/// Add a record batch to the snapshot that was announced by the last
/// [`StartSnapshot`](Self::StartSnapshot) message.
RecordBatch {
/// Record batch.
batch: RecordBatch,
},
}
impl From<QueryResponse> for FlatIngesterQueryResponseStream {
fn from(v: QueryResponse) -> Self {
v.into_partition_stream()
.flat_map(|partition| {
let partition_id = partition.id();
let completed_persistence_count = partition.completed_persistence_count();
let head = futures::stream::once(async move {
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id,
status: PartitionStatus {
parquet_max_sequence_number: None,
},
completed_persistence_count,
})
});
match partition.into_record_batch_stream() {
Some(stream) => {
let tail = stream.flat_map(|snapshot_res| match snapshot_res {
Ok(snapshot) => {
let schema =
Arc::new(prepare_schema_for_flight(&snapshot.schema()));
let schema_captured = Arc::clone(&schema);
let head = futures::stream::once(async {
Ok(FlatIngesterQueryResponse::StartSnapshot {
schema: schema_captured,
})
});
let batch =
prepare_batch_for_flight(&snapshot, Arc::clone(&schema))
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
let tail = match batch {
Ok(batch) => {
let batches = split_batch_for_grpc_response(
batch,
GRPC_TARGET_MAX_BATCH_SIZE_BYTES,
);
futures::stream::iter(batches)
.map(|batch| {
Ok(FlatIngesterQueryResponse::RecordBatch { batch })
})
.boxed()
}
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
};
head.chain(tail).boxed()
}
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
});
head.chain(tail).boxed()
}
None => head.boxed(),
}
})
.boxed()
}
}
/// A mapping decorator over a [`FlatIngesterQueryResponseStream`] that converts
/// it into Arrow Flight [`FlightData`] response frames.
#[pin_project]
struct FlightFrameCodec {
#[pin]
inner: Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>,
done: bool,
buffer: Vec<FlightData>,
/// Encode the partition information as a None flight data with meatadata
fn encode_partition(
// Partition ID.
partition_id: PartitionId,
// Partition persistence status.
status: PartitionStatus,
// Count of persisted Parquet files for the [`PartitionData`] instance this
// [`PartitionResponse`] was generated from.
//
// [`PartitionData`]: crate::buffer_tree::partition::PartitionData
// [`PartitionResponse`]: crate::query::partition_response::PartitionResponse
completed_persistence_count: u64,
ingester_id: IngesterId,
}
) -> std::result::Result<FlightData, FlightError> {
let mut bytes = bytes::BytesMut::new();
let app_metadata = proto::IngesterQueryResponseMetadata {
partition_id: partition_id.get(),
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: status.parquet_max_sequence_number,
}),
ingester_uuid: ingester_id.to_string(),
completed_persistence_count,
};
prost::Message::encode(&app_metadata, &mut bytes)
.map_err(|e| FlightError::from_external_error(Box::new(e)))?;
impl FlightFrameCodec {
fn new(inner: FlatIngesterQueryResponseStream, ingester_id: IngesterId) -> Self {
Self {
inner,
done: false,
buffer: vec![],
ingester_id,
}
}
}
impl Stream for FlightFrameCodec {
type Item = Result<FlightData, tonic::Status>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
if !this.buffer.is_empty() {
let next = this.buffer.remove(0);
return Poll::Ready(Some(Ok(next)));
}
if *this.done {
Poll::Ready(None)
} else {
match this.inner.poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
*this.done = true;
Poll::Ready(None)
}
Poll::Ready(Some(Err(e))) => {
*this.done = true;
let e = Error::Stream(e).into();
Poll::Ready(Some(Err(e)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::StartPartition {
partition_id,
status,
completed_persistence_count,
}))) => {
let mut bytes = bytes::BytesMut::new();
let app_metadata = proto::IngesterQueryResponseMetadata {
partition_id: partition_id.get(),
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: status.parquet_max_sequence_number,
}),
ingester_uuid: this.ingester_id.to_string(),
completed_persistence_count,
};
prost::Message::encode(&app_metadata, &mut bytes).map_err(Error::from)?;
let flight_data = FlightData::new(
None,
IpcMessage(build_none_flight_msg().into()),
bytes.to_vec(),
vec![],
);
Poll::Ready(Some(Ok(flight_data)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::StartSnapshot { schema }))) => {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into();
Poll::Ready(Some(Ok(flight_data)))
}
Poll::Ready(Some(Ok(FlatIngesterQueryResponse::RecordBatch { batch }))) => {
let options = arrow::ipc::writer::IpcWriteOptions::default();
let (mut flight_dictionaries, flight_batch) =
flight_data_from_arrow_batch(&batch, &options);
std::mem::swap(this.buffer, &mut flight_dictionaries);
this.buffer.push(flight_batch);
let next = this.buffer.remove(0);
Poll::Ready(Some(Ok(next)))
}
}
}
}
Ok(FlightData::new(
None,
IpcMessage(build_none_flight_msg().into()),
bytes.to_vec(),
vec![],
))
}
fn build_none_flight_msg() -> Vec<u8> {
@ -480,177 +295,50 @@ fn build_none_flight_msg() -> Vec<u8> {
fbb.finished_data().to_vec()
}
/// Converts a QueryResponse into a stream of Arrow Flight [`FlightData`] response frames.
fn encode_response(
response: QueryResponse,
ingester_id: IngesterId,
) -> BoxStream<'static, std::result::Result<FlightData, FlightError>> {
response
.into_partition_stream()
.flat_map(move |partition| {
let partition_id = partition.id();
let completed_persistence_count = partition.completed_persistence_count();
let head = futures::stream::once(async move {
encode_partition(
partition_id,
PartitionStatus {
parquet_max_sequence_number: None,
},
completed_persistence_count,
ingester_id,
)
});
match partition.into_record_batch_stream() {
Some(stream) => {
let stream = stream.map_err(FlightError::Arrow);
let tail = FlightDataEncoderBuilder::new().build(stream);
head.chain(tail).boxed()
}
None => head.boxed(),
}
})
.boxed()
}
#[cfg(test)]
mod tests {
use arrow::{error::ArrowError, ipc::MessageHeader};
use bytes::Bytes;
use data_types::PartitionId;
use futures::StreamExt;
use generated_types::influxdata::iox::ingester::v1::{self as proto};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::Projection;
use tonic::Code;
use crate::query::mock_query_exec::MockQueryExec;
use super::*;
#[tokio::test]
async fn test_get_stream_empty() {
assert_get_stream(IngesterId::new(), vec![], vec![]).await;
}
#[tokio::test]
async fn test_get_stream_all_types() {
let batch = lp_to_mutable_batch("table z=1 0")
.1
.to_arrow(Projection::All)
.unwrap();
let schema = batch.schema();
let ingester_id = IngesterId::new();
assert_get_stream(
ingester_id,
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
completed_persistence_count: 0,
}),
Ok(FlatIngesterQueryResponse::StartSnapshot { schema }),
Ok(FlatIngesterQueryResponse::RecordBatch { batch }),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
}),
completed_persistence_count: 0,
ingester_uuid: ingester_id.to_string(),
},
}),
Ok(DecodedFlightData {
header_type: MessageHeader::Schema,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_shortcuts_err() {
let ingester_id = IngesterId::new();
assert_get_stream(
ingester_id,
vec![
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
completed_persistence_count: 0,
}),
Err(ArrowError::IoError("foo".into())),
Ok(FlatIngesterQueryResponse::StartPartition {
partition_id: PartitionId::new(1),
status: PartitionStatus {
parquet_max_sequence_number: None,
},
completed_persistence_count: 0,
}),
],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::NONE,
app_metadata: proto::IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(proto::PartitionStatus {
parquet_max_sequence_number: None,
}),
completed_persistence_count: 0,
ingester_uuid: ingester_id.to_string(),
},
}),
Err(tonic::Code::Internal),
],
)
.await;
}
#[tokio::test]
async fn test_get_stream_dictionary_batches() {
let batch = lp_to_mutable_batch("table,x=\"foo\",y=\"bar\" z=1 0")
.1
.to_arrow(Projection::All)
.unwrap();
assert_get_stream(
IngesterId::new(),
vec![Ok(FlatIngesterQueryResponse::RecordBatch { batch })],
vec![
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::DictionaryBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
Ok(DecodedFlightData {
header_type: MessageHeader::RecordBatch,
app_metadata: proto::IngesterQueryResponseMetadata::default(),
}),
],
)
.await;
}
struct DecodedFlightData {
header_type: MessageHeader,
app_metadata: proto::IngesterQueryResponseMetadata,
}
async fn assert_get_stream(
ingester_id: IngesterId,
inputs: Vec<Result<FlatIngesterQueryResponse, ArrowError>>,
expected: Vec<Result<DecodedFlightData, tonic::Code>>,
) {
let inner = Box::pin(futures::stream::iter(inputs));
let stream = FlightFrameCodec::new(inner, ingester_id);
let actual: Vec<_> = stream.collect().await;
assert_eq!(actual.len(), expected.len());
for (actual, expected) in actual.into_iter().zip(expected) {
match (actual, expected) {
(Ok(actual), Ok(expected)) => {
let header_type = arrow::ipc::root_as_message(&actual.data_header[..])
.unwrap()
.header_type();
assert_eq!(header_type, expected.header_type);
let app_metadata: proto::IngesterQueryResponseMetadata =
prost::Message::decode(&actual.app_metadata[..]).unwrap();
assert_eq!(app_metadata, expected.app_metadata);
}
(Err(actual), Err(expected)) => {
assert_eq!(actual.code(), expected);
}
(Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"),
(Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"),
}
}
}
#[tokio::test]
async fn limits_concurrent_queries() {
let mut flight = FlightService::new(