refactor: prepare new ingester<>querier protocol on the querier side (#4863)

* refactor: prepare new ingester<>querier protocol on the querier side

This changes the querier internals to work with the new protocol. The
wire protocol stays the same (for now). There's a (somewhat hackish)
adapter in place on the querier side that converts the old to the new
protocol on-the-fly. This is an intermediate step before we actually
change the wire protocol (and in a step after that also take advantage
of the new possibilites on the ingester side).

Ref #4849.

* docs: explain adapter
pull/24376/head
Marco Neumann 2022-06-15 16:32:24 +02:00 committed by GitHub
parent 3bd24b67ba
commit 7c60edd38c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 460 additions and 250 deletions

View File

@ -38,8 +38,7 @@ message IngesterQueryRequest {
}
// Metadata that the ingester provides to the query service along with the results. Serialized
// in the FlightData's app_metadata for the schema that is returned as the first item from the
// Arrow Flight doGet request.
// in every FlightData's app_metadata .
message IngesterQueryResponseMetadata {
// There was no field 1, oops.
reserved 1;
@ -65,6 +64,18 @@ message IngesterQueryResponseMetadata {
// Map each record batch to a partition ID.
repeated int64 batch_partition_ids = 6;
// Partition id for this batch.
//
// This field is currently NOT used by the ingester but will be soon.
int64 partition_id = 7;
// Optional partition status.
//
// If this is given, then no schema and no batch will be part of this FlightData object.
//
// This field is currently NOT used by the ingester but will be soon.
PartitionStatus status = 8;
}
// Status of a partition that has unpersisted data.

View File

@ -398,6 +398,7 @@ impl GetStream {
})
.collect(),
batch_partition_ids: batch_partition_ids.into_iter().map(|id| id.get()).collect(),
..Default::default()
};
prost::Message::encode(&app_metadata, &mut bytes).context(SerializationSnafu)?;
schema_flight_data.app_metadata = bytes.to_vec();

View File

@ -1,6 +1,7 @@
use arrow::{datatypes::Schema, record_batch::RecordBatch};
use async_trait::async_trait;
use client_util::connection::{self, Connection};
use data_types::PartitionId;
use generated_types::ingester::IngesterQueryRequest;
use influxdb_iox_client::flight::{
generated_types as proto,
@ -8,12 +9,7 @@ use influxdb_iox_client::flight::{
};
use observability_deps::tracing::debug;
use snafu::{ResultExt, Snafu};
use std::{
collections::HashMap,
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
};
use std::{collections::HashMap, fmt::Debug, ops::DerefMut, sync::Arc};
pub use influxdb_iox_client::flight::Error as FlightError;
@ -119,6 +115,8 @@ impl FlightClient for FlightClientImpl {
inner: perform_query,
schema,
app_metadata,
batch_counter: 0,
state: Default::default(),
}))
}
}
@ -128,15 +126,11 @@ impl FlightClient for FlightClientImpl {
/// This is mostly the same as [`PerformQuery`] but allows some easier mocking.
#[async_trait]
pub trait QueryData: Debug + Send + 'static {
/// Returns the next `RecordBatch` available for this query, or `None` if
/// Returns the next [`LowLevelMessage`] available for this query, or `None` if
/// there are no further results available.
async fn next(&mut self) -> Result<Option<RecordBatch>, FlightError>;
/// App metadata that was part of the response.
fn app_metadata(&self) -> &proto::IngesterQueryResponseMetadata;
/// Schema.
fn schema(&self) -> Arc<Schema>;
async fn next(
&mut self,
) -> Result<Option<(LowLevelMessage, proto::IngesterQueryResponseMetadata)>, FlightError>;
}
#[async_trait]
@ -144,16 +138,67 @@ impl<T> QueryData for Box<T>
where
T: QueryData + ?Sized,
{
async fn next(&mut self) -> Result<Option<RecordBatch>, FlightError> {
async fn next(
&mut self,
) -> Result<Option<(LowLevelMessage, proto::IngesterQueryResponseMetadata)>, FlightError> {
self.deref_mut().next().await
}
}
fn app_metadata(&self) -> &proto::IngesterQueryResponseMetadata {
self.deref().app_metadata()
}
/// Protocol state.
///
/// ```text
///
/// [NoPartitionYet]
/// |
/// +-----------------------o
/// | |
/// V |
/// [NewPartition]<-----------o |
/// | | |
/// V | |
/// [PartitionAnnounced]<--o | |
/// | | | |
/// V | | |
/// [SchemaAnnounced] | | |
/// | | | |
/// V | | |
/// [BatchTransmitted] | | |
/// | | | |
/// +-----------------+--+ |
/// | |
/// | o---------------------o
/// | |
/// V V
/// [End]<--o
/// | |
/// o-----o
///
/// ```
#[derive(Debug)]
enum PerformQueryAdapterState {
NoPartitionYet,
NewPartition {
partition_id: PartitionId,
batch: RecordBatch,
},
PartitionAnnounced {
partition_id: PartitionId,
batch: RecordBatch,
},
SchemaAnnounced {
partition_id: PartitionId,
batch: RecordBatch,
},
BatchTransmitted {
partition_id: PartitionId,
},
End,
}
fn schema(&self) -> Arc<Schema> {
self.deref().schema()
impl Default for PerformQueryAdapterState {
fn default() -> Self {
Self::NoPartitionYet
}
}
@ -162,18 +207,25 @@ struct PerformQueryAdapter {
inner: PerformQuery<proto::IngesterQueryResponseMetadata>,
app_metadata: proto::IngesterQueryResponseMetadata,
schema: Arc<Schema>,
batch_counter: usize,
state: PerformQueryAdapterState,
}
#[async_trait]
impl QueryData for PerformQueryAdapter {
async fn next(&mut self) -> Result<Option<RecordBatch>, FlightError> {
impl PerformQueryAdapter {
/// Get next batch from underlying [`PerformQuery`] alongside with a batch index (which in turn can be used to
/// look up the batch partition).
///
/// Returns `Ok(None)` if the stream has ended.
async fn next_batch(&mut self) -> Result<Option<(RecordBatch, usize)>, FlightError> {
loop {
match self.inner.next().await? {
None => {
return Ok(None);
}
Some((LowLevelMessage::RecordBatch(batch), _)) => {
return Ok(Some(batch));
let pos = self.batch_counter;
self.batch_counter += 1;
return Ok(Some((batch, pos)));
}
// ignore all other message types for now
Some((LowLevelMessage::None | LowLevelMessage::Schema(_), _)) => (),
@ -181,12 +233,118 @@ impl QueryData for PerformQueryAdapter {
}
}
fn app_metadata(&self) -> &proto::IngesterQueryResponseMetadata {
&self.app_metadata
/// Announce new partition (id and its persistence status).
fn announce_partition(
&mut self,
partition_id: PartitionId,
) -> (LowLevelMessage, proto::IngesterQueryResponseMetadata) {
let meta = proto::IngesterQueryResponseMetadata {
partition_id: partition_id.get(),
status: Some(
self.app_metadata
.unpersisted_partitions
.remove(&partition_id.get())
.unwrap(),
),
..Default::default()
};
(LowLevelMessage::None, meta)
}
}
fn schema(&self) -> Arc<Schema> {
Arc::clone(&self.schema)
#[async_trait]
impl QueryData for PerformQueryAdapter {
async fn next(
&mut self,
) -> Result<Option<(LowLevelMessage, proto::IngesterQueryResponseMetadata)>, FlightError> {
loop {
match std::mem::take(&mut self.state) {
PerformQueryAdapterState::End => {
if let Some(partition_id) =
self.app_metadata.unpersisted_partitions.keys().next()
{
let partition_id = PartitionId::new(*partition_id);
return Ok(Some(self.announce_partition(partition_id)));
} else {
return Ok(None);
}
}
PerformQueryAdapterState::NoPartitionYet => {
let (batch, pos) = if let Some(x) = self.next_batch().await? {
x
} else {
self.state = PerformQueryAdapterState::End;
continue;
};
let partition_id = PartitionId::new(self.app_metadata.batch_partition_ids[pos]);
self.state = PerformQueryAdapterState::NewPartition {
partition_id,
batch,
};
continue;
}
PerformQueryAdapterState::NewPartition {
partition_id,
batch,
} => {
self.state = PerformQueryAdapterState::PartitionAnnounced {
partition_id,
batch,
};
return Ok(Some(self.announce_partition(partition_id)));
}
PerformQueryAdapterState::PartitionAnnounced {
partition_id,
batch,
} => {
self.state = PerformQueryAdapterState::SchemaAnnounced {
partition_id,
batch,
};
let meta = Default::default();
return Ok(Some((
LowLevelMessage::Schema(Arc::clone(&self.schema)),
meta,
)));
}
PerformQueryAdapterState::SchemaAnnounced {
partition_id,
batch,
} => {
self.state = PerformQueryAdapterState::BatchTransmitted { partition_id };
let meta = Default::default();
return Ok(Some((LowLevelMessage::RecordBatch(batch), meta)));
}
PerformQueryAdapterState::BatchTransmitted { partition_id } => {
let (batch, pos) = if let Some(x) = self.next_batch().await? {
x
} else {
self.state = PerformQueryAdapterState::End;
continue;
};
let partition_id2 =
PartitionId::new(self.app_metadata.batch_partition_ids[pos]);
if partition_id == partition_id2 {
self.state = PerformQueryAdapterState::PartitionAnnounced {
partition_id,
batch,
};
continue;
} else {
self.state = PerformQueryAdapterState::NewPartition {
partition_id: partition_id2,
batch,
};
continue;
}
}
}
}
}
}

View File

@ -17,6 +17,7 @@ use generated_types::{
ingester::{encode_proto_predicate_as_base64, IngesterQueryRequest},
write_info::merge_responses,
};
use influxdb_iox_client::flight::low_level::LowLevelMessage;
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
util::compute_timenanosecond_min_max,
@ -69,34 +70,6 @@ pub enum Error {
source: FlightClientError,
},
#[snafu(display("Batch {} is missing partition id from '{}'", pos, ingester_address))]
MissingPartitionId {
pos: usize,
ingester_address: String,
},
#[snafu(display(
"Got too many partition IDs from '{}', expected {} but got {}",
ingester_address,
expected,
actual
))]
TooManyPartitionIds {
expected: usize,
actual: usize,
ingester_address: String,
},
#[snafu(display(
"Got batch for partition id {} that was not marked as unpersisted from '{}'",
partition_id,
ingester_address,
))]
UnknownPartitionId {
partition_id: i64,
ingester_address: String,
},
#[snafu(display("Failed to connect to ingester '{}': {}", ingester_address, source))]
Connecting {
ingester_address: String,
@ -114,6 +87,27 @@ pub enum Error {
write_token: String,
source: influxdb_iox_client::error::Error,
},
#[snafu(display(
"Partition status missing for partition {partition_id}, ingestger: {ingester_address}"
))]
PartitionStatusMissing {
partition_id: i64,
ingester_address: String,
},
#[snafu(display(
"Got batch without partition information from ingestger: {ingester_address}"
))]
BatchWithoutPartition { ingester_address: String },
#[snafu(display(
"Duplicate partition info for partition {partition_id}, ingestger: {ingester_address}"
))]
DuplicatePartitionInfo {
partition_id: i64,
ingester_address: String,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -271,17 +265,12 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<IngesterPar
// read unpersisted partitions
// map partition_id -> (PartitionMetadata, Vec<PartitionData>))
let mut partitions: HashMap<_, _> = perform_query
.app_metadata()
.unpersisted_partitions
.iter()
.map(|(id, state)| (*id, (state.clone(), vec![])))
.collect();
let mut partitions: HashMap<_, _> = HashMap::new();
// sort batches into partitions
let mut num_batches = 0usize;
let partition_ids = perform_query.app_metadata().batch_partition_ids.clone();
while let Some(batch) = perform_query
let mut current_partition_id = None;
while let Some((msg, md)) = perform_query
.next()
.await
.map_err(|source| FlightClientError::Flight { source })
@ -289,32 +278,41 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<IngesterPar
ingester_address: ingester_address.as_ref(),
})?
{
let partition_id = *partition_ids
.get(num_batches)
.context(MissingPartitionIdSnafu {
pos: num_batches,
ingester_address: ingester_address.as_ref(),
})?;
partitions
.get_mut(&partition_id)
.context(UnknownPartitionIdSnafu {
partition_id,
ingester_address: ingester_address.as_ref(),
})?
.1
.push(batch);
num_batches += 1;
match msg {
LowLevelMessage::None => {
// new partition announced
let partition_id = md.partition_id;
let status = md.status.context(PartitionStatusMissingSnafu {
partition_id,
ingester_address: ingester_address.as_ref(),
})?;
let existing = partitions.insert(partition_id, (status, vec![]));
ensure!(
existing.is_none(),
DuplicatePartitionInfoSnafu {
partition_id,
ingester_address: ingester_address.as_ref()
},
);
current_partition_id = Some(partition_id);
}
LowLevelMessage::Schema(_) => {
// can be ignored, schemas are propagated to the batches automatically by the low-level client
}
LowLevelMessage::RecordBatch(batch) => {
let partition_id = current_partition_id.context(BatchWithoutPartitionSnafu {
ingester_address: ingester_address.as_ref(),
})?;
partitions
.get_mut(&partition_id)
.expect("current partition should have been inserted")
.1
.push(batch);
num_batches += 1;
}
}
}
debug!(%ingester_address, num_batches, "Received batches from ingester");
trace!(?partitions, schema=?perform_query.schema(), "Detailed from ingester");
ensure!(
num_batches == partition_ids.len(),
TooManyPartitionIdsSnafu {
actual: partition_ids.len(),
expected: num_batches,
ingester_address: ingester_address.as_ref(),
},
);
let mut ingester_partitions = vec![];
for (partition_id, (state, batches)) in partitions {
@ -330,6 +328,7 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<IngesterPar
Arc::clone(&table_name),
partition_id,
sequencer_id,
// TODO(marco): project schema to the columns that are present within this partition
Arc::clone(&expected_schema),
state.parquet_max_sequence_number.map(SequenceNumber::new),
state.tombstone_max_sequence_number.map(SequenceNumber::new),
@ -809,7 +808,7 @@ fn calculate_summary(batches: &[RecordBatch], schema: &Schema) -> TableSummary {
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use arrow::{
array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray},
@ -883,11 +882,6 @@ mod tests {
results: vec![Err(FlightError::GrpcError(tonic::Status::internal(
"don't know",
)))],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::new(),
batch_partition_ids: vec![],
},
schema: schema().as_arrow(),
}),
)])
.await,
@ -900,18 +894,7 @@ mod tests {
#[tokio::test]
async fn test_flight_no_partitions() {
let mock_flight_client = Arc::new(
MockFlightClient::new([(
"addr1",
Ok(MockQueryData {
results: vec![],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::new(),
batch_partition_ids: vec![],
},
schema: schema().as_arrow(),
}),
)])
.await,
MockFlightClient::new([("addr1", Ok(MockQueryData { results: vec![] }))]).await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let partitions = get_partitions(&ingester_conn).await.unwrap();
@ -924,18 +907,17 @@ mod tests {
MockFlightClient::new([(
"addr1",
Ok(MockQueryData {
results: vec![],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
1,
PartitionStatus {
results: vec![Ok((
LowLevelMessage::None,
IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
},
)]),
batch_partition_ids: vec![],
},
schema: schema().as_arrow(),
}),
..Default::default()
},
))],
}),
)])
.await,
@ -954,142 +936,189 @@ mod tests {
}
#[tokio::test]
async fn test_flight_err_missing_partition_id() {
let record_batch = lp_to_record_batch("table foo=1 1");
async fn test_flight_err_partition_status_missing() {
let mock_flight_client = Arc::new(
MockFlightClient::new([(
"addr1",
Ok(MockQueryData {
results: vec![Ok(record_batch)],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
1,
PartitionStatus {
parquet_max_sequence_number: Some(1),
tombstone_max_sequence_number: Some(2),
},
)]),
batch_partition_ids: vec![],
},
schema: schema().as_arrow(),
results: vec![Ok((
LowLevelMessage::None,
IngesterQueryResponseMetadata {
partition_id: 1,
status: None,
..Default::default()
},
))],
}),
)])
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::MissingPartitionId { .. });
assert_matches!(err, Error::PartitionStatusMissing { .. });
}
#[tokio::test]
async fn test_flight_err_too_many_partition_ids() {
let record_batch = lp_to_record_batch("table foo=1 1");
async fn test_flight_err_duplicate_partition_info() {
let mock_flight_client = Arc::new(
MockFlightClient::new([(
"addr1",
Ok(MockQueryData {
results: vec![Ok(record_batch)],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
1,
PartitionStatus {
parquet_max_sequence_number: Some(1),
tombstone_max_sequence_number: Some(2),
results: vec![
Ok((
LowLevelMessage::None,
IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
}),
..Default::default()
},
)]),
batch_partition_ids: vec![1, 2],
},
schema: schema().as_arrow(),
)),
Ok((
LowLevelMessage::None,
IngesterQueryResponseMetadata {
partition_id: 2,
status: Some(PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
}),
..Default::default()
},
)),
Ok((
LowLevelMessage::None,
IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
}),
..Default::default()
},
)),
],
}),
)])
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::TooManyPartitionIds { .. });
assert_matches!(err, Error::DuplicatePartitionInfo { .. });
}
#[tokio::test]
async fn test_flight_err_unknown_partition_id() {
async fn test_flight_err_batch_without_partition() {
let record_batch = lp_to_record_batch("table foo=1 1");
let mock_flight_client = Arc::new(
MockFlightClient::new([(
"addr1",
Ok(MockQueryData {
results: vec![Ok(record_batch)],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
1,
PartitionStatus {
parquet_max_sequence_number: Some(1),
tombstone_max_sequence_number: Some(2),
},
)]),
batch_partition_ids: vec![42],
},
schema: schema().as_arrow(),
results: vec![Ok((
LowLevelMessage::RecordBatch(record_batch),
IngesterQueryResponseMetadata::default(),
))],
}),
)])
.await,
);
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::UnknownPartitionId { .. });
assert_matches!(err, Error::BatchWithoutPartition { .. });
}
#[tokio::test]
async fn test_flight_many_batches() {
let record_batch_1_1 = lp_to_record_batch("table foo=1 1");
let record_batch_1_2 = lp_to_record_batch("table foo=2 2");
let record_batch_1_2 = lp_to_record_batch("table bar=20,foo=2 2");
let record_batch_2_1 = lp_to_record_batch("table foo=3 3");
let record_batch_3_1 = lp_to_record_batch("table foo=4 4");
let record_batch_3_1 = lp_to_record_batch("table baz=40,foo=4 4");
let schema_1_1 = record_batch_1_1.schema();
let schema_1_2 = record_batch_1_2.schema();
let schema_2_1 = record_batch_2_1.schema();
let schema_3_1 = record_batch_3_1.schema();
let mock_flight_client = Arc::new(
MockFlightClient::new([
(
"addr1",
Ok(MockQueryData {
results: vec![
Ok(record_batch_1_1),
Ok(record_batch_1_2),
Ok(record_batch_2_1),
],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([
(
1,
PartitionStatus {
Ok((
LowLevelMessage::None,
IngesterQueryResponseMetadata {
partition_id: 1,
status: Some(PartitionStatus {
parquet_max_sequence_number: Some(11),
tombstone_max_sequence_number: Some(12),
},
),
(
2,
PartitionStatus {
}),
..Default::default()
},
)),
Ok((
LowLevelMessage::Schema(Arc::clone(&schema_1_1)),
IngesterQueryResponseMetadata::default(),
)),
Ok((
LowLevelMessage::RecordBatch(record_batch_1_1),
IngesterQueryResponseMetadata::default(),
)),
Ok((
LowLevelMessage::Schema(Arc::clone(&schema_1_2)),
IngesterQueryResponseMetadata::default(),
)),
Ok((
LowLevelMessage::RecordBatch(record_batch_1_2),
IngesterQueryResponseMetadata::default(),
)),
Ok((
LowLevelMessage::None,
IngesterQueryResponseMetadata {
partition_id: 2,
status: Some(PartitionStatus {
parquet_max_sequence_number: Some(21),
tombstone_max_sequence_number: Some(22),
},
),
]),
batch_partition_ids: vec![1, 1, 2],
},
schema: schema().as_arrow(),
}),
..Default::default()
},
)),
Ok((
LowLevelMessage::Schema(Arc::clone(&schema_2_1)),
IngesterQueryResponseMetadata::default(),
)),
Ok((
LowLevelMessage::RecordBatch(record_batch_2_1),
IngesterQueryResponseMetadata::default(),
)),
],
}),
),
(
"addr2",
Ok(MockQueryData {
results: vec![Ok(record_batch_3_1)],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
3,
PartitionStatus {
parquet_max_sequence_number: Some(31),
tombstone_max_sequence_number: Some(32),
results: vec![
Ok((
LowLevelMessage::None,
IngesterQueryResponseMetadata {
partition_id: 3,
status: Some(PartitionStatus {
parquet_max_sequence_number: Some(31),
tombstone_max_sequence_number: Some(32),
}),
..Default::default()
},
)]),
batch_partition_ids: vec![3],
},
schema: schema().as_arrow(),
)),
Ok((
LowLevelMessage::Schema(Arc::clone(&schema_3_1)),
IngesterQueryResponseMetadata::default(),
)),
Ok((
LowLevelMessage::RecordBatch(record_batch_3_1),
IngesterQueryResponseMetadata::default(),
)),
],
}),
),
])
@ -1112,6 +1141,8 @@ mod tests {
Some(SequenceNumber::new(12))
);
assert_eq!(p1.batches.len(), 2);
assert_eq!(p1.batches[0].schema(), schema().as_arrow());
assert_eq!(p1.batches[1].schema(), schema().as_arrow());
let p2 = &partitions[1];
assert_eq!(p2.partition_id.get(), 2);
@ -1125,6 +1156,7 @@ mod tests {
Some(SequenceNumber::new(22))
);
assert_eq!(p2.batches.len(), 1);
assert_eq!(p2.batches[0].schema(), schema().as_arrow());
let p3 = &partitions[2];
assert_eq!(p3.partition_id.get(), 3);
@ -1138,6 +1170,7 @@ mod tests {
Some(SequenceNumber::new(32))
);
assert_eq!(p3.batches.len(), 1);
assert_eq!(p3.batches[0].schema(), schema().as_arrow());
}
async fn get_partitions(
@ -1155,7 +1188,9 @@ mod tests {
fn schema() -> Arc<Schema> {
Arc::new(
SchemaBuilder::new()
.influx_field("col", InfluxFieldType::Integer)
.influx_field("bar", InfluxFieldType::Float)
.influx_field("baz", InfluxFieldType::Float)
.influx_field("foo", InfluxFieldType::Float)
.timestamp()
.build()
.unwrap(),
@ -1168,28 +1203,20 @@ mod tests {
#[derive(Debug)]
struct MockQueryData {
results: Vec<Result<RecordBatch, FlightError>>,
app_metadata: IngesterQueryResponseMetadata,
schema: Arc<arrow::datatypes::Schema>,
results: Vec<Result<(LowLevelMessage, IngesterQueryResponseMetadata), FlightError>>,
}
#[async_trait]
impl QueryData for MockQueryData {
async fn next(&mut self) -> Result<Option<RecordBatch>, FlightError> {
async fn next(
&mut self,
) -> Result<Option<(LowLevelMessage, IngesterQueryResponseMetadata)>, FlightError> {
if self.results.is_empty() {
Ok(None)
} else {
self.results.remove(0).map(Some)
}
}
fn app_metadata(&self) -> &IngesterQueryResponseMetadata {
&self.app_metadata
}
fn schema(&self) -> Arc<arrow::datatypes::Schema> {
Arc::clone(&self.schema)
}
}
#[derive(Debug)]

View File

@ -11,12 +11,12 @@ use data_types::{
use datafusion::physical_plan::RecordBatchStream;
use datafusion_util::MemoryStream;
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::{StreamExt, TryStreamExt};
use futures::TryStreamExt;
use generated_types::{
influxdata::iox::ingester::v1::{IngesterQueryResponseMetadata, PartitionStatus},
ingester::IngesterQueryRequest,
};
use influxdb_iox_client::flight::Error as FlightError;
use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError};
use ingester::{
data::{IngesterData, IngesterQueryResponse, Persister, SequencerData},
lifecycle::LifecycleHandle,
@ -951,66 +951,79 @@ impl IngesterFlightClient for MockIngester {
..response
};
Ok(Box::new(QueryDataAdapter::new(response)))
Ok(Box::new(QueryDataAdapter::new(response).await))
}
}
/// Helper struct to present [`IngesterQueryResponse`] (produces by the ingester) as a
/// [`IngesterFlightClientQueryData`] (used by the querier) without doing any real gRPC IO.
#[derive(Debug)]
struct QueryDataAdapter {
response: IngesterQueryResponse,
app_metadata: IngesterQueryResponseMetadata,
messages: Box<dyn Iterator<Item = (LowLevelMessage, IngesterQueryResponseMetadata)> + Send>,
}
impl std::fmt::Debug for QueryDataAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryDataAdapter").finish_non_exhaustive()
}
}
impl QueryDataAdapter {
/// Create new adapter.
///
/// This pre-calculates some data structure that we are going to need later.
fn new(response: IngesterQueryResponse) -> Self {
let app_metadata = IngesterQueryResponseMetadata {
unpersisted_partitions: response
.unpersisted_partitions
.iter()
.map(|(id, status)| {
(
id.get(),
PartitionStatus {
parquet_max_sequence_number: status
.parquet_max_sequence_number
.map(|x| x.get()),
tombstone_max_sequence_number: status
.tombstone_max_sequence_number
.map(|x| x.get()),
},
)
})
.collect(),
batch_partition_ids: response
.batch_partition_ids
.iter()
.map(|id| id.get())
.collect(),
};
async fn new(mut response: IngesterQueryResponse) -> Self {
let mut partitions: BTreeMap<_, _> = response
.unpersisted_partitions
.into_iter()
.map(|(partition_id, status)| (partition_id, (status, vec![])))
.collect();
let mut id_it = response.batch_partition_ids.into_iter();
while let Some(batch) = response.data.try_next().await.unwrap() {
let partition_id = id_it.next().unwrap();
partitions.get_mut(&partition_id).unwrap().1.push(batch);
}
let mut messages = vec![];
for (partition_id, (status, batches)) in partitions {
let status = PartitionStatus {
parquet_max_sequence_number: status.parquet_max_sequence_number.map(|x| x.get()),
tombstone_max_sequence_number: status
.tombstone_max_sequence_number
.map(|x| x.get()),
};
messages.push((
LowLevelMessage::None,
IngesterQueryResponseMetadata {
partition_id: partition_id.get(),
status: Some(status),
..Default::default()
},
));
for batch in batches {
messages.push((
LowLevelMessage::Schema(batch.schema()),
IngesterQueryResponseMetadata::default(),
));
messages.push((
LowLevelMessage::RecordBatch(batch),
IngesterQueryResponseMetadata::default(),
));
}
}
Self {
response,
app_metadata,
messages: Box::new(messages.into_iter()),
}
}
}
#[async_trait]
impl IngesterFlightClientQueryData for QueryDataAdapter {
async fn next(&mut self) -> Result<Option<RecordBatch>, FlightError> {
Ok(self.response.data.next().await.map(|x| x.unwrap()))
}
fn app_metadata(&self) -> &IngesterQueryResponseMetadata {
&self.app_metadata
}
fn schema(&self) -> Arc<arrow::datatypes::Schema> {
self.response.schema.as_arrow()
async fn next(
&mut self,
) -> Result<Option<(LowLevelMessage, IngesterQueryResponseMetadata)>, FlightError> {
Ok(self.messages.next())
}
}