feat: Pass trace id through Flight requests from querier to ingester

Fixes #5723.
pull/24376/head
Carol (Nichols || Goulding) 2022-10-19 12:10:17 -04:00
parent 48fa18f346
commit 59e1c1d5b9
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
15 changed files with 194 additions and 107 deletions

4
Cargo.lock generated
View File

@ -2216,6 +2216,9 @@ dependencies = [
"tokio",
"tokio-stream",
"tonic",
"trace",
"trace_exporters",
"trace_http",
]
[[package]]
@ -3992,6 +3995,7 @@ dependencies = [
"tempfile",
"test_helpers",
"tokio",
"trace",
"workspace-hack",
]

View File

@ -38,6 +38,21 @@ pub struct SetRequestHeadersService<S> {
headers: Arc<Vec<(HeaderName, HeaderValue)>>,
}
impl<S> SetRequestHeadersService<S> {
pub fn new(service: S, headers: Vec<(HeaderName, HeaderValue)>) -> Self {
Self {
service,
headers: Arc::new(headers),
}
}
pub fn into_parts(self) -> (S, Arc<Vec<(HeaderName, HeaderValue)>>) {
let SetRequestHeadersService { service, headers } = self;
(service, headers)
}
}
impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for SetRequestHeadersService<S>
where
S: Service<Request<ReqBody>, Response = Response<ResBody>>,

View File

@ -53,7 +53,7 @@ pub struct Config {
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let mut client = flight::low_level::Client::new(connection);
let mut client = flight::low_level::Client::new(connection, None);
let Config {
namespace,
format,

View File

@ -29,7 +29,7 @@ async fn ingester_flight_api() {
let mut querier_flight = influxdb_iox_client::flight::low_level::Client::<
influxdb_iox_client::flight::generated_types::IngesterQueryRequest,
>::new(cluster.ingester().ingester_grpc_connection());
>::new(cluster.ingester().ingester_grpc_connection(), None);
let query = IngesterQueryRequest::new(
cluster.namespace().to_string(),
@ -98,7 +98,7 @@ async fn ingester_flight_api_namespace_not_found() {
let mut querier_flight = influxdb_iox_client::flight::low_level::Client::<
influxdb_iox_client::flight::generated_types::IngesterQueryRequest,
>::new(cluster.ingester().ingester_grpc_connection());
>::new(cluster.ingester().ingester_grpc_connection(), None);
let query = IngesterQueryRequest::new(
String::from("does_not_exist"),
@ -137,7 +137,7 @@ async fn ingester_flight_api_table_not_found() {
let mut querier_flight = influxdb_iox_client::flight::low_level::Client::<
influxdb_iox_client::flight::generated_types::IngesterQueryRequest,
>::new(cluster.ingester().ingester_grpc_connection());
>::new(cluster.ingester().ingester_grpc_connection(), None);
let query = IngesterQueryRequest::new(
cluster.namespace().to_string(),

View File

@ -25,3 +25,6 @@ tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-threa
tokio-stream = "0.1.11"
thiserror = "1.0.37"
tonic = { version = "0.8" }
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trace_http = { path = "../trace_http" }

View File

@ -1,32 +1,30 @@
//! Low-level flight client.
//!
//! This client allows more inspection of the flight messages which can be helpful to implement more advanced protocols.
//! This client allows more inspection of the flight messages which can be helpful to implement
//! more advanced protocols.
//!
//! # Protocol Usage
//!
//! The client handles flight messages as followes:
//!
//! - **None:** App metadata is extracted. Otherwise this message has no effect. This is useful to transmit metadata
//! without any actual payload.
//! - **Schema:** The schema is (re-)set. Dictionaries are cleared. App metadata is extraced and both the schema and the
//! metadata are presented to the user.
//! - **Dictionary Batch:** A new dictionary for a given column is registered. An existing dictionary for the same
//! column will be overwritten. No app metadata is extracted. This message is NOT visible to the user.
//! - **Record Batch:** Record batch is created based on the current schema and dictionaries. This fails if no schema
//! was transmitted yet. App metadata is extracted is is presented -- together with the record batch -- to the user.
//! - **None:** App metadata is extracted. Otherwise this message has no effect. This is useful to
//! transmit metadata without any actual payload.
//! - **Schema:** The schema is (re-)set. Dictionaries are cleared. App metadata is extraced and
//! both the schema and the metadata are presented to the user.
//! - **Dictionary Batch:** A new dictionary for a given column is registered. An existing
//! dictionary for the same column will be overwritten. No app metadata is extracted. This
//! message is NOT visible to the user.
//! - **Record Batch:** Record batch is created based on the current schema and dictionaries. This
//! fails if no schema was transmitted yet. App metadata is extracted is is presented -- together
//! with the record batch -- to the user.
//!
//! All other message types (at the time of writing: tensor and sparse tensor) lead to an error.
use std::{collections::HashMap, convert::TryFrom, marker::PhantomData, sync::Arc};
use super::Error;
use ::generated_types::influxdata::iox::{
ingester::v1::{IngesterQueryRequest, IngesterQueryResponseMetadata},
querier::v1::{AppMetadata, ReadInfo},
};
use client_util::connection::{Connection, GrpcConnection};
use futures_util::stream;
use futures_util::stream::StreamExt;
use prost::Message;
use tonic::Streaming;
use arrow::{
array::ArrayRef,
buffer::Buffer,
@ -38,9 +36,18 @@ use arrow_flight::{
flight_service_client::FlightServiceClient, utils::flight_data_to_arrow_batch, FlightData,
HandshakeRequest, Ticket,
};
use super::Error;
use client_util::connection::{Connection, GrpcConnection};
use futures_util::stream;
use futures_util::stream::StreamExt;
use prost::Message;
use rand::Rng;
use std::{collections::HashMap, convert::TryFrom, marker::PhantomData, str::FromStr, sync::Arc};
use tonic::{
codegen::http::header::{HeaderName, HeaderValue},
Streaming,
};
use trace::ctx::SpanContext;
use trace_http::ctx::format_jaeger_trace_context;
/// Metadata that can be send during flight requests.
pub trait ClientMetadata: Message {
@ -59,9 +66,11 @@ impl ClientMetadata for IngesterQueryRequest {
/// Low-level flight client.
///
/// # Request and Response Metadata
/// The type parameter `T` -- which must implement [`ClientMetadata`] describes the request and response metadata that
/// is send and received during the flight request. The request is encoded as protobuf and send as the Flight "ticket",
/// the response is received via the so called "app metadata".
///
/// The type parameter `T` -- which must implement [`ClientMetadata`] -- describes the request and
/// response metadata that is sent and received during the flight request. The request is encoded
/// as protobuf and send as the Flight "ticket". The response is received via the so-called "app
/// metadata".
#[derive(Debug)]
pub struct Client<T>
where
@ -76,15 +85,32 @@ where
T: ClientMetadata,
{
/// Creates a new client with the provided connection
pub fn new(connection: Connection) -> Self {
pub fn new(connection: Connection, span_context: Option<SpanContext>) -> Self {
let grpc_conn = connection.into_grpc_connection();
let grpc_conn = if let Some(ctx) = span_context {
let (service, headers) = grpc_conn.into_parts();
let mut headers: HashMap<_, _> = headers.iter().cloned().collect();
let key =
HeaderName::from_str(trace_exporters::DEFAULT_JAEGER_TRACE_CONTEXT_HEADER_NAME)
.unwrap();
let value = HeaderValue::from_str(&format_jaeger_trace_context(&ctx)).unwrap();
headers.insert(key, value);
GrpcConnection::new(service, headers.into_iter().collect())
} else {
grpc_conn
};
Self {
inner: FlightServiceClient::new(connection.into_grpc_connection()),
inner: FlightServiceClient::new(grpc_conn),
_phantom: PhantomData::default(),
}
}
/// Query the given database with the given SQL query, and return a
/// [`PerformQuery`] instance that streams low-level message results.
/// Query the given database with the given SQL query, and return a [`PerformQuery`] instance
/// that streams low-level message results.
pub async fn perform_query(&mut self, request: T) -> Result<PerformQuery<T::Response>, Error> {
PerformQuery::<T::Response>::new(self, request).await
}
@ -141,6 +167,7 @@ impl LowLevelMessage {
LowLevelMessage::RecordBatch(_) => panic!("Contains record batch"),
}
}
/// Unwrap schema.
pub fn unwrap_schema(self) -> Arc<Schema> {
match self {
@ -160,9 +187,8 @@ impl LowLevelMessage {
}
}
/// A struct that manages the stream of Arrow `RecordBatch` results from an
/// Arrow Flight query. Created by calling the `perform_query` method on a
/// Flight [`Client`].
/// A struct that manages the stream of Arrow `RecordBatch` results from an Arrow Flight query.
/// Created by calling the `perform_query` method on a Flight [`Client`].
#[derive(Debug)]
pub struct PerformQuery<T>
where

View File

@ -120,7 +120,7 @@ impl Client {
/// Creates a new client with the provided connection
pub fn new(connection: Connection) -> Self {
Self {
inner: LowLevelClient::new(connection),
inner: LowLevelClient::new(connection, None),
}
}

View File

@ -23,6 +23,7 @@ use tokio::{
task::{JoinError, JoinHandle},
};
use tokio_util::sync::CancellationToken;
use trace::span::{Span, SpanRecorder};
use write_buffer::core::WriteBufferReading;
use write_summary::ShardProgress;
@ -71,6 +72,7 @@ pub trait IngestHandler: Send + Sync {
async fn query(
&self,
request: IngesterQueryRequest,
span: Option<Span>,
) -> Result<IngesterQueryResponse, crate::querier_handler::Error>;
/// Return shard progress for the requested shard indexes
@ -331,7 +333,10 @@ impl IngestHandler for IngestHandlerImpl {
async fn query(
&self,
request: IngesterQueryRequest,
span: Option<Span>,
) -> Result<IngesterQueryResponse, crate::querier_handler::Error> {
let span_recorder = SpanRecorder::new(span);
// TODO(4567): move this into a instrumented query delegate
// Acquire and hold a permit for the duration of this request, or return
@ -361,7 +366,12 @@ impl IngestHandler for IngestHandlerImpl {
let t = self.time_provider.now();
let request = Arc::new(request);
let res = prepare_data_to_querier(&self.data, &request).await;
let res = prepare_data_to_querier(
&self.data,
&request,
span_recorder.child_span("ingester prepare data to querier"),
)
.await;
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
@ -668,14 +678,14 @@ mod tests {
predicate: None,
};
let res = ingester.query(request.clone()).await.unwrap_err();
let res = ingester.query(request.clone(), None).await.unwrap_err();
assert!(matches!(
res,
crate::querier_handler::Error::NamespaceNotFound { .. }
));
ingester.request_sem = Semaphore::new(0);
let res = ingester.query(request).await.unwrap_err();
let res = ingester.query(request, None).await.unwrap_err();
assert!(matches!(res, crate::querier_handler::Error::RequestLimit));
}
}

View File

@ -1,7 +1,12 @@
//! Handle all requests from Querier
use std::{pin::Pin, sync::Arc};
use crate::{
data::{
namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName,
IngesterData,
},
query::QueryableBatch,
};
use arrow::{array::new_null_array, error::ArrowError, record_batch::RecordBatch};
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
use data_types::{PartitionId, SequenceNumber};
@ -12,14 +17,8 @@ use generated_types::ingester::IngesterQueryRequest;
use observability_deps::tracing::debug;
use schema::{merge::SchemaMerger, selection::Selection};
use snafu::{ensure, Snafu};
use crate::{
data::{
namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName,
IngesterData,
},
query::QueryableBatch,
};
use std::{pin::Pin, sync::Arc};
use trace::span::{Span, SpanRecorder};
/// Number of table data read locks that shall be acquired in parallel
const CONCURRENT_TABLE_DATA_LOCKS: usize = 10;
@ -171,12 +170,13 @@ impl IngesterQueryResponse {
/// Convert [`IngesterQueryResponse`] to a set of [`RecordBatch`]es.
///
/// If the response contains multiple snapshots, this will merge the schemas into a single one and create
/// NULL-columns for snapshots that miss columns.
/// If the response contains multiple snapshots, this will merge the schemas into a single one
/// and create NULL-columns for snapshots that miss columns.
///
/// # Panic
/// Panics if there are no batches returned at all. Also panics if the snapshot-scoped schemas do not line up with
/// the snapshot-scoped record batches.
///
/// Panics if there are no batches returned at all. Also panics if the snapshot-scoped schemas
/// do not line up with the snapshot-scoped record batches.
pub async fn into_record_batches(self) -> Vec<RecordBatch> {
let mut snapshot_schema = None;
let mut schema_merger = SchemaMerger::new();
@ -260,8 +260,12 @@ pub enum FlatIngesterQueryResponse {
pub async fn prepare_data_to_querier(
ingest_data: &Arc<IngesterData>,
request: &Arc<IngesterQueryRequest>,
span: Option<Span>,
) -> Result<IngesterQueryResponse> {
debug!(?request, "prepare_data_to_querier");
let span_recorder = SpanRecorder::new(span);
let mut tables_data = vec![];
let mut found_namespace = false;
for (shard_id, shard_data) in ingest_data.shards() {
@ -324,13 +328,18 @@ pub async fn prepare_data_to_querier(
// extract payload
let partition_id = partition.partition_id;
let status = partition.partition_status.clone();
let snapshots: Vec<_> = prepare_data_to_querier_for_partition(partition, &request)
.into_iter()
.map(Ok)
.collect();
let snapshots: Vec<_> = prepare_data_to_querier_for_partition(
partition,
&request,
span_recorder.child_span("ingester prepare data to querier for partition"),
)
.into_iter()
.map(Ok)
.collect();
// Note: include partition in `unpersisted_partitions` even when there we might filter out all the data, because
// the metadata (e.g. max persisted parquet file) is important for the querier.
// Note: include partition in `unpersisted_partitions` even when there we might filter
// out all the data, because the metadata (e.g. max persisted parquet file) is
// important for the querier.
Ok(IngesterQueryPartition::new(
Box::pin(futures::stream::iter(snapshots)),
partition_id,
@ -344,7 +353,10 @@ pub async fn prepare_data_to_querier(
fn prepare_data_to_querier_for_partition(
unpersisted_partition_data: UnpersistedPartitionData,
request: &IngesterQueryRequest,
span: Option<Span>,
) -> Vec<SendableRecordBatchStream> {
let mut span_recorder = SpanRecorder::new(span);
// ------------------------------------------------
// Accumulate data
@ -368,7 +380,7 @@ fn prepare_data_to_querier_for_partition(
})
.with_data(unpersisted_partition_data.non_persisted);
queryable_batch
let streams = queryable_batch
.data
.iter()
.map(|snapshot_batch| {
@ -393,7 +405,11 @@ fn prepare_data_to_querier_for_partition(
// create stream
Box::pin(MemoryStream::new(vec![batch])) as SendableRecordBatchStream
})
.collect()
.collect();
span_recorder.ok("done");
streams
}
#[cfg(test)]
@ -501,6 +517,8 @@ mod tests {
async fn test_prepare_data_to_querier() {
test_helpers::maybe_start_logging();
let span = None;
// make 14 scenarios for ingester data
let mut scenarios = vec![];
for two_partitions in [false, true] {
@ -541,7 +559,7 @@ mod tests {
];
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let result = prepare_data_to_querier(scenario, &request)
let result = prepare_data_to_querier(scenario, &request, span.clone())
.await
.unwrap()
.into_record_batches()
@ -577,7 +595,7 @@ mod tests {
];
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let result = prepare_data_to_querier(scenario, &request)
let result = prepare_data_to_querier(scenario, &request, span.clone())
.await
.unwrap()
.into_record_batches()
@ -622,7 +640,7 @@ mod tests {
];
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let result = prepare_data_to_querier(scenario, &request)
let result = prepare_data_to_querier(scenario, &request, span.clone())
.await
.unwrap()
.into_record_batches()
@ -639,7 +657,7 @@ mod tests {
));
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let err = prepare_data_to_querier(scenario, &request)
let err = prepare_data_to_querier(scenario, &request, span.clone())
.await
.unwrap_err();
assert_matches!(err, Error::TableNotFound { .. });
@ -654,7 +672,7 @@ mod tests {
));
for (loc, scenario) in &scenarios {
println!("Location: {loc:?}");
let err = prepare_data_to_querier(scenario, &request)
let err = prepare_data_to_querier(scenario, &request, span.clone())
.await
.unwrap_err();
assert_matches!(err, Error::NamespaceNotFound { .. });

View File

@ -1,14 +1,9 @@
//! gRPC service implementations for `ingester`.
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
use crate::{
handler::IngestHandler,
querier_handler::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
};
use arrow::error::ArrowError;
use arrow_flight::{
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
@ -25,17 +20,19 @@ use observability_deps::tracing::{debug, info, warn};
use pin_project::pin_project;
use prost::Message;
use snafu::{ResultExt, Snafu};
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
};
use tonic::{Request, Response, Streaming};
use trace::ctx::SpanContext;
use trace::{ctx::SpanContext, span::SpanExt};
use write_summary::WriteSummary;
use crate::{
handler::IngestHandler,
querier_handler::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
};
/// This type is responsible for managing all gRPC services exposed by
/// `ingester`.
/// This type is responsible for managing all gRPC services exposed by `ingester`.
#[derive(Debug, Default)]
pub struct GrpcDelegate<I: IngestHandler> {
ingest_handler: Arc<I>,
@ -47,8 +44,7 @@ pub struct GrpcDelegate<I: IngestHandler> {
}
impl<I: IngestHandler + Send + Sync + 'static> GrpcDelegate<I> {
/// Initialise a new [`GrpcDelegate`] passing valid requests to the
/// specified `ingest_handler`.
/// Initialise a new [`GrpcDelegate`] passing valid requests to the specified `ingest_handler`.
pub fn new(ingest_handler: Arc<I>, test_flight_do_get_panic: Arc<AtomicU64>) -> Self {
Self {
ingest_handler,
@ -260,7 +256,7 @@ impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
&self,
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, tonic::Status> {
let _span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let ticket = request.into_inner();
let proto_query_request =
@ -272,25 +268,25 @@ impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
self.maybe_panic_in_flight_do_get();
let query_response =
self.ingest_handler
.query(query_request)
.await
.map_err(|e| match e {
crate::querier_handler::Error::NamespaceNotFound { namespace_name } => {
Error::NamespaceNotFound { namespace_name }
}
crate::querier_handler::Error::TableNotFound {
namespace_name,
table_name,
} => Error::TableNotFound {
namespace_name,
table_name,
},
_ => Error::Query {
source: Box::new(e),
},
})?;
let query_response = self
.ingest_handler
.query(query_request, span_ctx.child_span("ingest handler query"))
.await
.map_err(|e| match e {
crate::querier_handler::Error::NamespaceNotFound { namespace_name } => {
Error::NamespaceNotFound { namespace_name }
}
crate::querier_handler::Error::TableNotFound {
namespace_name,
table_name,
} => Error::TableNotFound {
namespace_name,
table_name,
},
_ => Error::Query {
source: Box::new(e),
},
})?;
let output = GetStream::new(query_response.flatten());

View File

@ -314,7 +314,7 @@ impl TestContext {
&self,
req: IngesterQueryRequest,
) -> Result<IngesterQueryResponse, ingester::querier_handler::Error> {
self.ingester.query(req).await
self.ingester.query(req, None).await
}
/// Retrieve the specified metric value.

View File

@ -8,6 +8,7 @@ use influxdb_iox_client::flight::{
use observability_deps::tracing::debug;
use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, fmt::Debug, ops::DerefMut, sync::Arc};
use trace::ctx::SpanContext;
pub use influxdb_iox_client::flight::Error as FlightError;
@ -45,6 +46,7 @@ pub trait FlightClient: Debug + Send + Sync + 'static {
&self,
ingester_address: Arc<str>,
request: IngesterQueryRequest,
span_context: Option<SpanContext>,
) -> Result<Box<dyn QueryData>, Error>;
}
@ -90,10 +92,12 @@ impl FlightClient for FlightClientImpl {
&self,
ingester_addr: Arc<str>,
request: IngesterQueryRequest,
span_context: Option<SpanContext>,
) -> Result<Box<dyn QueryData>, Error> {
let connection = self.connect(Arc::clone(&ingester_addr)).await?;
let mut client = LowLevelFlightClient::<proto::IngesterQueryRequest>::new(connection);
let mut client =
LowLevelFlightClient::<proto::IngesterQueryRequest>::new(connection, span_context);
debug!(%ingester_addr, ?request, "Sending request to ingester");
let request: proto::IngesterQueryRequest =
@ -172,7 +176,7 @@ impl CachedConnection {
// sanity check w/ a handshake
let mut client =
LowLevelFlightClient::<proto::IngesterQueryRequest>::new(connection.clone());
LowLevelFlightClient::<proto::IngesterQueryRequest>::new(connection.clone(), None);
// make contact with the ingester
client

View File

@ -418,7 +418,13 @@ async fn execute(
};
let query_res = flight_client
.query(Arc::clone(&ingester_address), ingester_query_request)
.query(
Arc::clone(&ingester_address),
ingester_query_request,
span_recorder
.child_span("IngesterQuery")
.map(|span| span.ctx),
)
.await;
if let Err(FlightClientError::Flight {
@ -1230,7 +1236,7 @@ mod tests {
};
use test_helpers::assert_error;
use tokio::{runtime::Handle, sync::Mutex};
use trace::{span::SpanStatus, RingBufferTraceCollector};
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector};
#[tokio::test]
async fn test_flight_handshake_error() {
@ -1902,6 +1908,7 @@ mod tests {
&self,
ingester_address: Arc<str>,
_request: IngesterQueryRequest,
_span_context: Option<SpanContext>,
) -> Result<Box<dyn QueryData>, FlightClientError> {
self.responses
.lock()

View File

@ -31,6 +31,7 @@ querier = { path = "../querier" }
schema = { path = "../schema" }
sharder = { path = "../sharder" }
tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread", "time"] }
trace = { path = "../trace" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]

View File

@ -41,6 +41,7 @@ use std::{
time::Duration,
};
use tokio::runtime::Handle;
use trace::ctx::SpanContext;
// Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle
// & when delete predicates are applied
@ -945,11 +946,13 @@ impl IngesterFlightClient for MockIngester {
&self,
_ingester_address: Arc<str>,
request: IngesterQueryRequest,
_span_context: Option<SpanContext>,
) -> Result<Box<dyn IngesterFlightClientQueryData>, IngesterFlightClientError> {
let span = None;
// NOTE: we MUST NOT unwrap errors here because some query tests assert error behavior
// (e.g. passing predicates of wrong types)
let request = Arc::new(request);
let response = prepare_data_to_querier(&self.ingester_data, &request)
let response = prepare_data_to_querier(&self.ingester_data, &request, span)
.await
.map_err(|e| IngesterFlightClientError::Flight {
source: FlightError::ArrowError(arrow::error::ArrowError::ExternalError(Box::new(