Merge pull request #5832 from influxdata/dom/move-query-types
refactor: move query types to query_handlerpull/24376/head
commit
990fa55e28
|
@ -1,15 +1,12 @@
|
|||
//! Data for the lifecycle of the Ingester
|
||||
|
||||
use std::{collections::BTreeMap, pin::Pin, sync::Arc};
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use arrow::{error::ArrowError, record_batch::RecordBatch};
|
||||
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, ShardIndex, TableId};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
|
||||
use dml::DmlOperation;
|
||||
use futures::{Stream, StreamExt};
|
||||
use iox_catalog::interface::{get_table_schema_by_id, Catalog};
|
||||
use iox_query::exec::Executor;
|
||||
use iox_time::SystemProvider;
|
||||
|
@ -30,11 +27,7 @@ pub mod partition;
|
|||
pub(crate) mod shard;
|
||||
pub(crate) mod table;
|
||||
|
||||
use self::{
|
||||
partition::{resolver::PartitionProvider, PartitionStatus},
|
||||
shard::ShardData,
|
||||
table::TableName,
|
||||
};
|
||||
use self::{partition::resolver::PartitionProvider, shard::ShardData, table::TableName};
|
||||
|
||||
#[cfg(test)]
|
||||
mod triggers;
|
||||
|
@ -482,172 +475,24 @@ impl Persister for IngesterData {
|
|||
}
|
||||
}
|
||||
|
||||
/// Stream of snapshots.
|
||||
///
|
||||
/// Every snapshot is a dedicated [`SendableRecordBatchStream`].
|
||||
pub(crate) type SnapshotStream =
|
||||
Pin<Box<dyn Stream<Item = Result<SendableRecordBatchStream, ArrowError>> + Send>>;
|
||||
|
||||
/// Response data for a single partition.
|
||||
pub(crate) struct IngesterQueryPartition {
|
||||
/// Stream of snapshots.
|
||||
snapshots: SnapshotStream,
|
||||
|
||||
/// Partition ID.
|
||||
id: PartitionId,
|
||||
|
||||
/// Partition persistence status.
|
||||
status: PartitionStatus,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for IngesterQueryPartition {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("IngesterQueryPartition")
|
||||
.field("snapshots", &"<SNAPSHOT STREAM>")
|
||||
.field("id", &self.id)
|
||||
.field("status", &self.status)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl IngesterQueryPartition {
|
||||
pub(crate) fn new(snapshots: SnapshotStream, id: PartitionId, status: PartitionStatus) -> Self {
|
||||
Self {
|
||||
snapshots,
|
||||
id,
|
||||
status,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream of partitions in this response.
|
||||
pub(crate) type IngesterQueryPartitionStream =
|
||||
Pin<Box<dyn Stream<Item = Result<IngesterQueryPartition, ArrowError>> + Send>>;
|
||||
|
||||
/// Response streams for querier<>ingester requests.
|
||||
///
|
||||
/// The data structure is constructed to allow lazy/streaming data generation. For easier
|
||||
/// consumption according to the wire protocol, use the [`flatten`](Self::flatten) method.
|
||||
pub struct IngesterQueryResponse {
|
||||
/// Stream of partitions.
|
||||
partitions: IngesterQueryPartitionStream,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for IngesterQueryResponse {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("IngesterQueryResponse")
|
||||
.field("partitions", &"<PARTITION STREAM>")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl IngesterQueryResponse {
|
||||
/// Make a response
|
||||
pub(crate) fn new(partitions: IngesterQueryPartitionStream) -> Self {
|
||||
Self { partitions }
|
||||
}
|
||||
|
||||
/// Flattens the data according to the wire protocol.
|
||||
pub fn flatten(self) -> FlatIngesterQueryResponseStream {
|
||||
self.partitions
|
||||
.flat_map(|partition_res| match partition_res {
|
||||
Ok(partition) => {
|
||||
let head = futures::stream::once(async move {
|
||||
Ok(FlatIngesterQueryResponse::StartPartition {
|
||||
partition_id: partition.id,
|
||||
status: partition.status,
|
||||
})
|
||||
});
|
||||
let tail = partition
|
||||
.snapshots
|
||||
.flat_map(|snapshot_res| match snapshot_res {
|
||||
Ok(snapshot) => {
|
||||
let schema = Arc::new(optimize_schema(&snapshot.schema()));
|
||||
|
||||
let schema_captured = Arc::clone(&schema);
|
||||
let head = futures::stream::once(async {
|
||||
Ok(FlatIngesterQueryResponse::StartSnapshot {
|
||||
schema: schema_captured,
|
||||
})
|
||||
});
|
||||
|
||||
let tail = snapshot.map(move |batch_res| match batch_res {
|
||||
Ok(batch) => Ok(FlatIngesterQueryResponse::RecordBatch {
|
||||
batch: optimize_record_batch(&batch, Arc::clone(&schema))?,
|
||||
}),
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
|
||||
head.chain(tail).boxed()
|
||||
}
|
||||
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
|
||||
});
|
||||
|
||||
head.chain(tail).boxed()
|
||||
}
|
||||
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
/// Flattened version of [`IngesterQueryResponse`].
|
||||
pub(crate) type FlatIngesterQueryResponseStream =
|
||||
Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>;
|
||||
|
||||
/// Element within the flat wire protocol.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum FlatIngesterQueryResponse {
|
||||
/// Start a new partition.
|
||||
StartPartition {
|
||||
/// Partition ID.
|
||||
partition_id: PartitionId,
|
||||
|
||||
/// Partition persistence status.
|
||||
status: PartitionStatus,
|
||||
},
|
||||
|
||||
/// Start a new snapshot.
|
||||
///
|
||||
/// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition)
|
||||
/// message.
|
||||
StartSnapshot {
|
||||
/// Snapshot schema.
|
||||
schema: Arc<arrow::datatypes::Schema>,
|
||||
},
|
||||
|
||||
/// Add a record batch to the snapshot that was announced by the last
|
||||
/// [`StartSnapshot`](Self::StartSnapshot) message.
|
||||
RecordBatch {
|
||||
/// Record batch.
|
||||
batch: RecordBatch,
|
||||
},
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
ops::DerefMut,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
use std::{ops::DerefMut, sync::Arc, time::Duration};
|
||||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use assert_matches::assert_matches;
|
||||
use data_types::{
|
||||
ColumnId, ColumnSet, CompactionLevel, DeletePredicate, NamespaceSchema, NonEmptyString,
|
||||
ParquetFileParams, Sequence, Timestamp, TimestampRange,
|
||||
};
|
||||
use datafusion::physical_plan::RecordBatchStream;
|
||||
|
||||
use dml::{DmlDelete, DmlMeta, DmlWrite};
|
||||
use futures::TryStreamExt;
|
||||
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
|
||||
use iox_time::Time;
|
||||
use metric::{MetricObserver, Observation};
|
||||
use mutable_batch_lp::{lines_to_batches, test_helpers::lp_to_mutable_batch};
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use object_store::memory::InMemory;
|
||||
use schema::selection::Selection;
|
||||
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
|
@ -1506,128 +1351,4 @@ mod tests {
|
|||
|
||||
assert_eq!(progresses, expected_progresses);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ingester_query_response_flatten() {
|
||||
let batch_1_1 = lp_to_batch("table x=1 0");
|
||||
let batch_1_2 = lp_to_batch("table x=2 1");
|
||||
let batch_2 = lp_to_batch("table y=1 10");
|
||||
let batch_3 = lp_to_batch("table z=1 10");
|
||||
|
||||
let schema_1 = batch_1_1.schema();
|
||||
let schema_2 = batch_2.schema();
|
||||
let schema_3 = batch_3.schema();
|
||||
|
||||
let response = IngesterQueryResponse::new(Box::pin(futures::stream::iter([
|
||||
Ok(IngesterQueryPartition::new(
|
||||
Box::pin(futures::stream::iter([
|
||||
Ok(Box::pin(TestRecordBatchStream::new(
|
||||
vec![
|
||||
Ok(batch_1_1.clone()),
|
||||
Err(ArrowError::NotYetImplemented("not yet implemeneted".into())),
|
||||
Ok(batch_1_2.clone()),
|
||||
],
|
||||
Arc::clone(&schema_1),
|
||||
)) as _),
|
||||
Err(ArrowError::InvalidArgumentError("invalid arg".into())),
|
||||
Ok(Box::pin(TestRecordBatchStream::new(
|
||||
vec![Ok(batch_2.clone())],
|
||||
Arc::clone(&schema_2),
|
||||
)) as _),
|
||||
Ok(Box::pin(TestRecordBatchStream::new(vec![], Arc::clone(&schema_3))) as _),
|
||||
])),
|
||||
PartitionId::new(2),
|
||||
PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
},
|
||||
)),
|
||||
Err(ArrowError::IoError("some io error".into())),
|
||||
Ok(IngesterQueryPartition::new(
|
||||
Box::pin(futures::stream::iter([])),
|
||||
PartitionId::new(1),
|
||||
PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
},
|
||||
)),
|
||||
])));
|
||||
|
||||
let actual: Vec<_> = response.flatten().collect().await;
|
||||
let expected = vec![
|
||||
Ok(FlatIngesterQueryResponse::StartPartition {
|
||||
partition_id: PartitionId::new(2),
|
||||
status: PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
},
|
||||
}),
|
||||
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_1 }),
|
||||
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_1 }),
|
||||
Err(ArrowError::NotYetImplemented("not yet implemeneted".into())),
|
||||
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_2 }),
|
||||
Err(ArrowError::InvalidArgumentError("invalid arg".into())),
|
||||
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_2 }),
|
||||
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_2 }),
|
||||
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_3 }),
|
||||
Err(ArrowError::IoError("some io error".into())),
|
||||
Ok(FlatIngesterQueryResponse::StartPartition {
|
||||
partition_id: PartitionId::new(1),
|
||||
status: PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
},
|
||||
}),
|
||||
];
|
||||
|
||||
assert_eq!(actual.len(), expected.len());
|
||||
for (actual, expected) in actual.into_iter().zip(expected) {
|
||||
match (actual, expected) {
|
||||
(Ok(actual), Ok(expected)) => {
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
(Err(_), Err(_)) => {
|
||||
// cannot compare `ArrowError`, but it's unlikely that someone changed the error
|
||||
}
|
||||
(Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"),
|
||||
(Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn lp_to_batch(lp: &str) -> RecordBatch {
|
||||
lp_to_mutable_batch(lp).1.to_arrow(Selection::All).unwrap()
|
||||
}
|
||||
|
||||
pub struct TestRecordBatchStream {
|
||||
schema: SchemaRef,
|
||||
batches: Vec<Result<RecordBatch, ArrowError>>,
|
||||
}
|
||||
|
||||
impl TestRecordBatchStream {
|
||||
pub fn new(batches: Vec<Result<RecordBatch, ArrowError>>, schema: SchemaRef) -> Self {
|
||||
Self { schema, batches }
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for TestRecordBatchStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
}
|
||||
|
||||
impl futures::Stream for TestRecordBatchStream {
|
||||
type Item = Result<RecordBatch, ArrowError>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
if self.batches.is_empty() {
|
||||
Poll::Ready(None)
|
||||
} else {
|
||||
Poll::Ready(Some(self.batches.remove(0)))
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(self.batches.len(), Some(self.batches.len()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ use self::{
|
|||
buffer::{BufferBatch, DataBuffer},
|
||||
resolver::DeferredSortKey,
|
||||
};
|
||||
use crate::query::QueryableBatch;
|
||||
use crate::{querier_handler::PartitionStatus, query::QueryableBatch};
|
||||
|
||||
use super::table::TableName;
|
||||
|
||||
|
@ -30,17 +30,6 @@ pub(crate) struct UnpersistedPartitionData {
|
|||
pub(crate) partition_status: PartitionStatus,
|
||||
}
|
||||
|
||||
/// Status of a partition that has unpersisted data.
|
||||
///
|
||||
/// Note that this structure is specific to a partition (which itself is bound to a table and
|
||||
/// shard)!
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[allow(missing_copy_implementations)]
|
||||
pub struct PartitionStatus {
|
||||
/// Max sequence number persisted
|
||||
pub parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
/// PersistingBatch contains all needed info and data for creating
|
||||
/// a parquet file for given set of SnapshotBatches
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
|
|
|
@ -6,10 +6,8 @@ use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId
|
|||
use mutable_batch::MutableBatch;
|
||||
use write_summary::ShardProgress;
|
||||
|
||||
use super::partition::{
|
||||
resolver::PartitionProvider, PartitionData, PartitionStatus, UnpersistedPartitionData,
|
||||
};
|
||||
use crate::lifecycle::LifecycleHandle;
|
||||
use super::partition::{resolver::PartitionProvider, PartitionData, UnpersistedPartitionData};
|
||||
use crate::{lifecycle::LifecycleHandle, querier_handler::PartitionStatus};
|
||||
|
||||
/// A double-referenced map where [`PartitionData`] can be looked up by
|
||||
/// [`PartitionKey`], or ID.
|
||||
|
|
|
@ -30,11 +30,11 @@ use crate::{
|
|||
data::{
|
||||
partition::resolver::{CatalogPartitionResolver, PartitionCache, PartitionProvider},
|
||||
shard::ShardData,
|
||||
IngesterData, IngesterQueryResponse,
|
||||
IngesterData,
|
||||
},
|
||||
lifecycle::{run_lifecycle_manager, LifecycleConfig, LifecycleManager},
|
||||
poison::PoisonCabinet,
|
||||
querier_handler::prepare_data_to_querier,
|
||||
querier_handler::{prepare_data_to_querier, IngesterQueryResponse},
|
||||
stream_handler::{
|
||||
handler::SequencedStreamHandler, sink_adaptor::IngestSinkAdaptor,
|
||||
sink_instrumentation::SinkInstrumentation, PeriodicWatermarkFetcher,
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
//! Handle all requests from Querier
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::{pin::Pin, sync::Arc};
|
||||
|
||||
use arrow::{error::ArrowError, record_batch::RecordBatch};
|
||||
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
|
||||
use data_types::{PartitionId, SequenceNumber};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use datafusion_util::MemoryStream;
|
||||
use futures::StreamExt;
|
||||
use futures::{Stream, StreamExt};
|
||||
use generated_types::ingester::IngesterQueryRequest;
|
||||
use observability_deps::tracing::debug;
|
||||
use schema::selection::Selection;
|
||||
|
@ -13,7 +16,7 @@ use snafu::{ensure, Snafu};
|
|||
use crate::{
|
||||
data::{
|
||||
namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName,
|
||||
IngesterData, IngesterQueryPartition, IngesterQueryResponse,
|
||||
IngesterData,
|
||||
},
|
||||
query::QueryableBatch,
|
||||
};
|
||||
|
@ -47,6 +50,159 @@ pub enum Error {
|
|||
/// A specialized `Error` for Ingester's Query errors
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Stream of snapshots.
|
||||
///
|
||||
/// Every snapshot is a dedicated [`SendableRecordBatchStream`].
|
||||
pub(crate) type SnapshotStream =
|
||||
Pin<Box<dyn Stream<Item = Result<SendableRecordBatchStream, ArrowError>> + Send>>;
|
||||
|
||||
/// Status of a partition that has unpersisted data.
|
||||
///
|
||||
/// Note that this structure is specific to a partition (which itself is bound to a table and
|
||||
/// shard)!
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[allow(missing_copy_implementations)]
|
||||
pub struct PartitionStatus {
|
||||
/// Max sequence number persisted
|
||||
pub parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
/// Response data for a single partition.
|
||||
pub(crate) struct IngesterQueryPartition {
|
||||
/// Stream of snapshots.
|
||||
snapshots: SnapshotStream,
|
||||
|
||||
/// Partition ID.
|
||||
id: PartitionId,
|
||||
|
||||
/// Partition persistence status.
|
||||
status: PartitionStatus,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for IngesterQueryPartition {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("IngesterQueryPartition")
|
||||
.field("snapshots", &"<SNAPSHOT STREAM>")
|
||||
.field("id", &self.id)
|
||||
.field("status", &self.status)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl IngesterQueryPartition {
|
||||
pub(crate) fn new(snapshots: SnapshotStream, id: PartitionId, status: PartitionStatus) -> Self {
|
||||
Self {
|
||||
snapshots,
|
||||
id,
|
||||
status,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream of partitions in this response.
|
||||
pub(crate) type IngesterQueryPartitionStream =
|
||||
Pin<Box<dyn Stream<Item = Result<IngesterQueryPartition, ArrowError>> + Send>>;
|
||||
|
||||
/// Response streams for querier<>ingester requests.
|
||||
///
|
||||
/// The data structure is constructed to allow lazy/streaming data generation. For easier
|
||||
/// consumption according to the wire protocol, use the [`flatten`](Self::flatten) method.
|
||||
pub struct IngesterQueryResponse {
|
||||
/// Stream of partitions.
|
||||
partitions: IngesterQueryPartitionStream,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for IngesterQueryResponse {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("IngesterQueryResponse")
|
||||
.field("partitions", &"<PARTITION STREAM>")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl IngesterQueryResponse {
|
||||
/// Make a response
|
||||
pub(crate) fn new(partitions: IngesterQueryPartitionStream) -> Self {
|
||||
Self { partitions }
|
||||
}
|
||||
|
||||
/// Flattens the data according to the wire protocol.
|
||||
pub fn flatten(self) -> FlatIngesterQueryResponseStream {
|
||||
self.partitions
|
||||
.flat_map(|partition_res| match partition_res {
|
||||
Ok(partition) => {
|
||||
let head = futures::stream::once(async move {
|
||||
Ok(FlatIngesterQueryResponse::StartPartition {
|
||||
partition_id: partition.id,
|
||||
status: partition.status,
|
||||
})
|
||||
});
|
||||
let tail = partition
|
||||
.snapshots
|
||||
.flat_map(|snapshot_res| match snapshot_res {
|
||||
Ok(snapshot) => {
|
||||
let schema = Arc::new(optimize_schema(&snapshot.schema()));
|
||||
|
||||
let schema_captured = Arc::clone(&schema);
|
||||
let head = futures::stream::once(async {
|
||||
Ok(FlatIngesterQueryResponse::StartSnapshot {
|
||||
schema: schema_captured,
|
||||
})
|
||||
});
|
||||
|
||||
let tail = snapshot.map(move |batch_res| match batch_res {
|
||||
Ok(batch) => Ok(FlatIngesterQueryResponse::RecordBatch {
|
||||
batch: optimize_record_batch(&batch, Arc::clone(&schema))?,
|
||||
}),
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
|
||||
head.chain(tail).boxed()
|
||||
}
|
||||
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
|
||||
});
|
||||
|
||||
head.chain(tail).boxed()
|
||||
}
|
||||
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
/// Flattened version of [`IngesterQueryResponse`].
|
||||
pub(crate) type FlatIngesterQueryResponseStream =
|
||||
Pin<Box<dyn Stream<Item = Result<FlatIngesterQueryResponse, ArrowError>> + Send>>;
|
||||
|
||||
/// Element within the flat wire protocol.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum FlatIngesterQueryResponse {
|
||||
/// Start a new partition.
|
||||
StartPartition {
|
||||
/// Partition ID.
|
||||
partition_id: PartitionId,
|
||||
|
||||
/// Partition persistence status.
|
||||
status: PartitionStatus,
|
||||
},
|
||||
|
||||
/// Start a new snapshot.
|
||||
///
|
||||
/// The snapshot belongs to the partition of the last [`StartPartition`](Self::StartPartition)
|
||||
/// message.
|
||||
StartSnapshot {
|
||||
/// Snapshot schema.
|
||||
schema: Arc<arrow::datatypes::Schema>,
|
||||
},
|
||||
|
||||
/// Add a record batch to the snapshot that was announced by the last
|
||||
/// [`StartSnapshot`](Self::StartSnapshot) message.
|
||||
RecordBatch {
|
||||
/// Record batch.
|
||||
batch: RecordBatch,
|
||||
},
|
||||
}
|
||||
|
||||
/// Return data to send as a response back to the Querier per its request
|
||||
pub async fn prepare_data_to_querier(
|
||||
ingest_data: &Arc<IngesterData>,
|
||||
|
@ -189,19 +345,106 @@ fn prepare_data_to_querier_for_partition(
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::{array::new_null_array, record_batch::RecordBatch};
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use arrow::{array::new_null_array, datatypes::SchemaRef, record_batch::RecordBatch};
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use assert_matches::assert_matches;
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use datafusion::{
|
||||
logical_plan::{col, lit},
|
||||
physical_plan::RecordBatchStream,
|
||||
};
|
||||
use futures::TryStreamExt;
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use predicate::Predicate;
|
||||
use schema::merge::SchemaMerger;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
data::FlatIngesterQueryResponse,
|
||||
test_util::{make_ingester_data, DataLocation, TEST_NAMESPACE, TEST_TABLE},
|
||||
};
|
||||
use crate::test_util::{make_ingester_data, DataLocation, TEST_NAMESPACE, TEST_TABLE};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ingester_query_response_flatten() {
|
||||
let batch_1_1 = lp_to_batch("table x=1 0");
|
||||
let batch_1_2 = lp_to_batch("table x=2 1");
|
||||
let batch_2 = lp_to_batch("table y=1 10");
|
||||
let batch_3 = lp_to_batch("table z=1 10");
|
||||
|
||||
let schema_1 = batch_1_1.schema();
|
||||
let schema_2 = batch_2.schema();
|
||||
let schema_3 = batch_3.schema();
|
||||
|
||||
let response = IngesterQueryResponse::new(Box::pin(futures::stream::iter([
|
||||
Ok(IngesterQueryPartition::new(
|
||||
Box::pin(futures::stream::iter([
|
||||
Ok(Box::pin(TestRecordBatchStream::new(
|
||||
vec![
|
||||
Ok(batch_1_1.clone()),
|
||||
Err(ArrowError::NotYetImplemented("not yet implemeneted".into())),
|
||||
Ok(batch_1_2.clone()),
|
||||
],
|
||||
Arc::clone(&schema_1),
|
||||
)) as _),
|
||||
Err(ArrowError::InvalidArgumentError("invalid arg".into())),
|
||||
Ok(Box::pin(TestRecordBatchStream::new(
|
||||
vec![Ok(batch_2.clone())],
|
||||
Arc::clone(&schema_2),
|
||||
)) as _),
|
||||
Ok(Box::pin(TestRecordBatchStream::new(vec![], Arc::clone(&schema_3))) as _),
|
||||
])),
|
||||
PartitionId::new(2),
|
||||
PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
},
|
||||
)),
|
||||
Err(ArrowError::IoError("some io error".into())),
|
||||
Ok(IngesterQueryPartition::new(
|
||||
Box::pin(futures::stream::iter([])),
|
||||
PartitionId::new(1),
|
||||
PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
},
|
||||
)),
|
||||
])));
|
||||
|
||||
let actual: Vec<_> = response.flatten().collect().await;
|
||||
let expected = vec![
|
||||
Ok(FlatIngesterQueryResponse::StartPartition {
|
||||
partition_id: PartitionId::new(2),
|
||||
status: PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
},
|
||||
}),
|
||||
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_1 }),
|
||||
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_1 }),
|
||||
Err(ArrowError::NotYetImplemented("not yet implemeneted".into())),
|
||||
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_1_2 }),
|
||||
Err(ArrowError::InvalidArgumentError("invalid arg".into())),
|
||||
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_2 }),
|
||||
Ok(FlatIngesterQueryResponse::RecordBatch { batch: batch_2 }),
|
||||
Ok(FlatIngesterQueryResponse::StartSnapshot { schema: schema_3 }),
|
||||
Err(ArrowError::IoError("some io error".into())),
|
||||
Ok(FlatIngesterQueryResponse::StartPartition {
|
||||
partition_id: PartitionId::new(1),
|
||||
status: PartitionStatus {
|
||||
parquet_max_sequence_number: None,
|
||||
},
|
||||
}),
|
||||
];
|
||||
|
||||
assert_eq!(actual.len(), expected.len());
|
||||
for (actual, expected) in actual.into_iter().zip(expected) {
|
||||
match (actual, expected) {
|
||||
(Ok(actual), Ok(expected)) => {
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
(Err(_), Err(_)) => {
|
||||
// cannot compare `ArrowError`, but it's unlikely that someone changed the error
|
||||
}
|
||||
(Ok(_), Err(_)) => panic!("Actual is Ok but expected is Err"),
|
||||
(Err(_), Ok(_)) => panic!("Actual is Err but expected is Ok"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prepare_data_to_querier() {
|
||||
|
@ -358,6 +601,46 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct TestRecordBatchStream {
|
||||
schema: SchemaRef,
|
||||
batches: Vec<Result<RecordBatch, ArrowError>>,
|
||||
}
|
||||
|
||||
impl TestRecordBatchStream {
|
||||
pub fn new(batches: Vec<Result<RecordBatch, ArrowError>>, schema: SchemaRef) -> Self {
|
||||
Self { schema, batches }
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordBatchStream for TestRecordBatchStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
}
|
||||
|
||||
impl futures::Stream for TestRecordBatchStream {
|
||||
type Item = Result<RecordBatch, ArrowError>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
if self.batches.is_empty() {
|
||||
Poll::Ready(None)
|
||||
} else {
|
||||
Poll::Ready(Some(self.batches.remove(0)))
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(self.batches.len(), Some(self.batches.len()))
|
||||
}
|
||||
}
|
||||
|
||||
fn lp_to_batch(lp: &str) -> RecordBatch {
|
||||
lp_to_mutable_batch(lp).1.to_arrow(Selection::All).unwrap()
|
||||
}
|
||||
|
||||
/// Convert [`IngesterQueryResponse`] to a set of [`RecordBatch`]es.
|
||||
///
|
||||
/// If the response contains multiple snapshots, this will merge the schemas into a single one and create
|
||||
|
|
|
@ -30,8 +30,8 @@ use trace::ctx::SpanContext;
|
|||
use write_summary::WriteSummary;
|
||||
|
||||
use crate::{
|
||||
data::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
|
||||
handler::IngestHandler,
|
||||
querier_handler::{FlatIngesterQueryResponse, FlatIngesterQueryResponseStream},
|
||||
};
|
||||
|
||||
/// This type is responsible for managing all gRPC services exposed by
|
||||
|
@ -464,8 +464,9 @@ mod tests {
|
|||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use schema::selection::Selection;
|
||||
|
||||
use crate::querier_handler::PartitionStatus;
|
||||
|
||||
use super::*;
|
||||
use crate::data::partition::PartitionStatus;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_stream_empty() {
|
||||
|
|
|
@ -14,12 +14,9 @@ use generated_types::{
|
|||
};
|
||||
use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError};
|
||||
use ingester::{
|
||||
data::{
|
||||
partition::resolver::CatalogPartitionResolver, FlatIngesterQueryResponse, IngesterData,
|
||||
IngesterQueryResponse, Persister,
|
||||
},
|
||||
data::{partition::resolver::CatalogPartitionResolver, IngesterData, Persister},
|
||||
lifecycle::mock_handle::MockLifecycleHandle,
|
||||
querier_handler::prepare_data_to_querier,
|
||||
querier_handler::{prepare_data_to_querier, FlatIngesterQueryResponse, IngesterQueryResponse},
|
||||
};
|
||||
use iox_catalog::interface::get_schema_by_name;
|
||||
use iox_query::exec::{Executor, ExecutorConfig};
|
||||
|
|
Loading…
Reference in New Issue