feat: correctly dismantle ingester gRPC reponse in querier (#4323)

This now correctly processes record batches for the different
partitions. The actual code change is rather small but it requires some
substantial test infrastructure.
pull/24376/head
Marco Neumann 2022-04-19 13:09:40 +02:00 committed by GitHub
parent 5b48675435
commit 3588a06647
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 458 additions and 55 deletions

View File

@ -1,5 +1,10 @@
use std::fmt::Debug;
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
};
use arrow::{datatypes::Schema, record_batch::RecordBatch};
use async_trait::async_trait;
use client_util::connection;
use data_types2::IngesterQueryRequest;
@ -43,7 +48,7 @@ pub trait FlightClient: Debug + Send + Sync + 'static {
&self,
ingester_address: &str,
request: IngesterQueryRequest,
) -> Result<PerformQuery<IngesterQueryResponseMetadata>, Error>;
) -> Result<Box<dyn QueryData>, Error>;
}
/// Default [`FlightClient`] implemenetation that uses a real client.
@ -88,7 +93,7 @@ impl FlightClient for FlightClientImpl {
&self,
ingester_addr: &str,
request: IngesterQueryRequest,
) -> Result<PerformQuery<IngesterQueryResponseMetadata>, Error> {
) -> Result<Box<dyn QueryData>, Error> {
// TODO maybe cache this connection
let mut client = self.connect(ingester_addr).await?;
@ -96,6 +101,56 @@ impl FlightClient for FlightClientImpl {
let request: flight::generated_types::IngesterQueryRequest =
request.try_into().context(CreatingRequestSnafu)?;
client.perform_query(request).await.context(FlightSnafu)
let perform_query = client.perform_query(request).await.context(FlightSnafu)?;
Ok(Box::new(perform_query))
}
}
/// Data that is returned by an ingester gRPC query.
///
/// This is mostly the same as [`PerformQuery`] but allows some easier mocking.
#[async_trait]
pub trait QueryData: Debug + Send + 'static {
/// Returns the next `RecordBatch` available for this query, or `None` if
/// there are no further results available.
async fn next(&mut self) -> Result<Option<RecordBatch>, FlightError>;
/// App metadata that was part of the response.
fn app_metadata(&self) -> &IngesterQueryResponseMetadata;
/// Schema.
fn schema(&self) -> Arc<Schema>;
}
#[async_trait]
impl<T> QueryData for Box<T>
where
T: QueryData + ?Sized,
{
async fn next(&mut self) -> Result<Option<RecordBatch>, FlightError> {
self.deref_mut().next().await
}
fn app_metadata(&self) -> &IngesterQueryResponseMetadata {
self.deref().app_metadata()
}
fn schema(&self) -> Arc<Schema> {
self.deref().schema()
}
}
#[async_trait]
impl QueryData for PerformQuery<IngesterQueryResponseMetadata> {
async fn next(&mut self) -> Result<Option<RecordBatch>, FlightError> {
self.next().await
}
fn app_metadata(&self) -> &IngesterQueryResponseMetadata {
self.app_metadata()
}
fn schema(&self) -> Arc<Schema> {
self.schema()
}
}

View File

@ -1,9 +1,9 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
use async_trait::async_trait;
use data_types2::{
ChunkAddr, ChunkId, ChunkOrder, IngesterQueryRequest, SequenceNumber, TableSummary,
ChunkAddr, ChunkId, ChunkOrder, IngesterQueryRequest, PartitionId, SequenceNumber, TableSummary,
};
use datafusion_util::MemoryStream;
use observability_deps::tracing::{debug, trace};
@ -13,7 +13,7 @@ use query::{
QueryChunk, QueryChunkError, QueryChunkMeta,
};
use schema::{selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use self::{
flight_client::{Error as FlightClientError, FlightClient, FlightClientImpl, FlightError},
@ -48,6 +48,34 @@ pub enum Error {
ingester_address: String,
source: FlightClientError,
},
#[snafu(display("Batch {} is missing partition id from '{}'", pos, ingester_address))]
MissingPartitionId {
pos: usize,
ingester_address: String,
},
#[snafu(display(
"Got too many partition IDs from '{}', expected {} but got {}",
ingester_address,
expected,
actual
))]
TooManyPartitionIds {
expected: usize,
actual: usize,
ingester_address: String,
},
#[snafu(display(
"Got batch for partition id {} that was not marked as unpersisted from '{}'",
partition_id,
ingester_address,
))]
UnknownPartitionId {
partition_id: i64,
ingester_address: String,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -116,7 +144,7 @@ impl IngesterConnection for IngesterConnectionImpl {
predicate: &Predicate,
expected_schema: Arc<Schema>,
) -> Result<Vec<Arc<IngesterPartition>>> {
let mut partitions = vec![];
let mut ingester_partitions = vec![];
// TODO make these requests in parallel
for ingester_address in &self.ingester_addresses {
@ -147,30 +175,71 @@ impl IngesterConnection for IngesterConnectionImpl {
}
let mut perform_query = query_res.context(RemoteQuerySnafu { ingester_address })?;
// Gather up all the results (todo pass along partition information in metadata)
let batches = perform_query.collect().await.expect("collecting");
debug!(
num_batches = batches.len(),
"Received batches from ingester"
// read unpersisted partitions
// map partition_id -> (PartitionMetadata, Vec<PartitionData>))
let mut partitions: HashMap<_, _> = perform_query
.app_metadata()
.unpersisted_partitions
.iter()
.map(|(id, state)| (*id, (state.clone(), vec![])))
.collect();
// sort batches into partitions
let mut num_batches = 0usize;
let partition_ids = perform_query.app_metadata().batch_partition_ids.clone();
while let Some(batch) = perform_query
.next()
.await
.map_err(|source| FlightClientError::Flight { source })
.context(RemoteQuerySnafu { ingester_address })?
{
let partition_id =
*partition_ids
.get(num_batches)
.context(MissingPartitionIdSnafu {
pos: num_batches,
ingester_address,
})?;
partitions
.get_mut(&partition_id)
.context(UnknownPartitionIdSnafu {
partition_id,
ingester_address,
})?
.1
.push(batch);
num_batches += 1;
}
debug!(num_batches, "Received batches from ingester");
trace!(?partitions, schema=?perform_query.schema(), "Detailed from ingester");
ensure!(
num_batches == partition_ids.len(),
TooManyPartitionIdsSnafu {
actual: partition_ids.len(),
expected: num_batches,
ingester_address
},
);
trace!(?batches, schema=?perform_query.schema(), "Detailed from ingester");
// TODO split chunks up based on table partition, gather
// sequence numbers per partition
let parquet_max_sequence_number = None;
let ingester_partition = IngesterPartition::try_new(
ChunkId::new(),
Arc::clone(&namespace_name),
Arc::clone(&table_name),
Arc::clone(&expected_schema),
parquet_max_sequence_number,
batches,
)?;
partitions.push(Arc::new(ingester_partition));
for (partition_id, (state, batches)) in partitions {
// do NOT filter out empty partitions, because the caller of this functions needs the attached metadata
// to select the right parquet files and tombstones
let ingester_partition = IngesterPartition::try_new(
ChunkId::new(),
Arc::clone(&namespace_name),
Arc::clone(&table_name),
PartitionId::new(partition_id),
Arc::clone(&expected_schema),
state.parquet_max_sequence_number.map(SequenceNumber::new),
state.tombstone_max_sequence_number.map(SequenceNumber::new),
batches,
)?;
ingester_partitions.push(Arc::new(ingester_partition));
}
}
Ok(partitions)
ingester_partitions.sort_by_key(|p| p.partition_id);
Ok(ingester_partitions)
}
}
@ -192,11 +261,12 @@ pub struct IngesterPartition {
chunk_id: ChunkId,
namespace_name: Arc<str>,
table_name: Arc<str>,
partition_id: PartitionId,
schema: Arc<Schema>,
/// Maximum sequence number of persisted data for this partition in the ingester
/// TODO make this not an option
parquet_max_sequence_number: Option<SequenceNumber>,
tombstone_max_sequence_number: Option<SequenceNumber>,
batches: Vec<RecordBatch>,
}
@ -204,12 +274,15 @@ pub struct IngesterPartition {
impl IngesterPartition {
/// Creates a new IngesterPartition, translating the passed
/// `RecordBatches` into the correct types
#[allow(clippy::too_many_arguments)]
pub fn try_new(
chunk_id: ChunkId,
namespace_name: Arc<str>,
table_name: Arc<str>,
partition_id: PartitionId,
expected_schema: Arc<Schema>,
parquet_max_sequence_number: Option<SequenceNumber>,
tombstone_max_sequence_number: Option<SequenceNumber>,
batches: Vec<RecordBatch>,
) -> Result<Self> {
// ensure that the schema of the batches matches the required
@ -227,8 +300,10 @@ impl IngesterPartition {
chunk_id,
namespace_name,
table_name,
partition_id,
schema: expected_schema,
parquet_max_sequence_number,
tombstone_max_sequence_number,
batches,
})
}
@ -264,7 +339,7 @@ impl QueryChunk for IngesterPartition {
ChunkAddr {
db_name: Arc::clone(&self.namespace_name),
table_name: Arc::clone(&self.table_name),
partition_key: Arc::from("TODO PARTITION KEY"),
partition_key: Arc::from(self.partition_id.to_string()),
chunk_id: self.chunk_id,
}
}
@ -377,20 +452,20 @@ fn ensure_schema(batch: RecordBatch, expected_schema: &Schema) -> Result<RecordB
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use arrow::{
array::{ArrayRef, DictionaryArray, StringArray, TimestampNanosecondArray},
datatypes::Int32Type,
};
use assert_matches::assert_matches;
use influxdb_iox_client::flight::{
generated_types::IngesterQueryResponseMetadata, PerformQuery,
};
use schema::builder::SchemaBuilder;
use generated_types::influxdata::iox::ingester::v1::PartitionStatus;
use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::{builder::SchemaBuilder, InfluxFieldType};
use tokio::sync::Mutex;
use super::*;
use super::{flight_client::QueryData, *};
#[tokio::test]
async fn test_flight_handshake_error() {
@ -432,23 +507,302 @@ mod tests {
assert!(partitions.is_empty());
}
#[tokio::test]
async fn test_flight_stream_error() {
let mock_flight_client = Arc::new(MockFlightClient::from([(
"addr1",
Ok(MockQueryData {
results: vec![Err(FlightError::GrpcError(tonic::Status::internal(
"don't know",
)))],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::new(),
batch_partition_ids: vec![],
},
schema: schema().as_arrow(),
}),
)]));
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::RemoteQuery { .. });
}
#[tokio::test]
async fn test_flight_no_partitions() {
let mock_flight_client = Arc::new(MockFlightClient::from([(
"addr1",
Ok(MockQueryData {
results: vec![],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::new(),
batch_partition_ids: vec![],
},
schema: schema().as_arrow(),
}),
)]));
let ingester_conn = mock_flight_client.ingester_conn().await;
let partitions = get_partitions(&ingester_conn).await.unwrap();
assert!(partitions.is_empty());
}
#[tokio::test]
async fn test_flight_no_batches() {
let mock_flight_client = Arc::new(MockFlightClient::from([(
"addr1",
Ok(MockQueryData {
results: vec![],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
1,
PartitionStatus {
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None,
},
)]),
batch_partition_ids: vec![],
},
schema: schema().as_arrow(),
}),
)]));
let ingester_conn = mock_flight_client.ingester_conn().await;
let partitions = get_partitions(&ingester_conn).await.unwrap();
assert_eq!(partitions.len(), 1);
let p = &partitions[0];
assert_eq!(p.partition_id.get(), 1);
assert_eq!(p.parquet_max_sequence_number, None);
assert_eq!(p.tombstone_max_sequence_number, None);
assert_eq!(p.batches.len(), 0);
}
#[tokio::test]
async fn test_flight_err_missing_partition_id() {
let record_batch = lp_to_record_batch("table foo=1 1");
let mock_flight_client = Arc::new(MockFlightClient::from([(
"addr1",
Ok(MockQueryData {
results: vec![Ok(record_batch)],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
1,
PartitionStatus {
parquet_max_sequence_number: Some(1),
tombstone_max_sequence_number: Some(2),
},
)]),
batch_partition_ids: vec![],
},
schema: schema().as_arrow(),
}),
)]));
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::MissingPartitionId { .. });
}
#[tokio::test]
async fn test_flight_err_too_many_partition_ids() {
let record_batch = lp_to_record_batch("table foo=1 1");
let mock_flight_client = Arc::new(MockFlightClient::from([(
"addr1",
Ok(MockQueryData {
results: vec![Ok(record_batch)],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
1,
PartitionStatus {
parquet_max_sequence_number: Some(1),
tombstone_max_sequence_number: Some(2),
},
)]),
batch_partition_ids: vec![1, 2],
},
schema: schema().as_arrow(),
}),
)]));
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::TooManyPartitionIds { .. });
}
#[tokio::test]
async fn test_flight_err_unknown_partition_id() {
let record_batch = lp_to_record_batch("table foo=1 1");
let mock_flight_client = Arc::new(MockFlightClient::from([(
"addr1",
Ok(MockQueryData {
results: vec![Ok(record_batch)],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
1,
PartitionStatus {
parquet_max_sequence_number: Some(1),
tombstone_max_sequence_number: Some(2),
},
)]),
batch_partition_ids: vec![42],
},
schema: schema().as_arrow(),
}),
)]));
let ingester_conn = mock_flight_client.ingester_conn().await;
let err = get_partitions(&ingester_conn).await.unwrap_err();
assert_matches!(err, Error::UnknownPartitionId { .. });
}
#[tokio::test]
async fn test_flight_many_batches() {
let record_batch_1_1 = lp_to_record_batch("table foo=1 1");
let record_batch_1_2 = lp_to_record_batch("table foo=2 2");
let record_batch_2_1 = lp_to_record_batch("table foo=3 3");
let record_batch_3_1 = lp_to_record_batch("table foo=4 4");
let mock_flight_client = Arc::new(MockFlightClient::from([
(
"addr1",
Ok(MockQueryData {
results: vec![
Ok(record_batch_1_1),
Ok(record_batch_1_2),
Ok(record_batch_2_1),
],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([
(
1,
PartitionStatus {
parquet_max_sequence_number: Some(11),
tombstone_max_sequence_number: Some(12),
},
),
(
2,
PartitionStatus {
parquet_max_sequence_number: Some(21),
tombstone_max_sequence_number: Some(22),
},
),
]),
batch_partition_ids: vec![1, 1, 2],
},
schema: schema().as_arrow(),
}),
),
(
"addr2",
Ok(MockQueryData {
results: vec![Ok(record_batch_3_1)],
app_metadata: IngesterQueryResponseMetadata {
unpersisted_partitions: BTreeMap::from([(
3,
PartitionStatus {
parquet_max_sequence_number: Some(31),
tombstone_max_sequence_number: Some(32),
},
)]),
batch_partition_ids: vec![3],
},
schema: schema().as_arrow(),
}),
),
]));
let ingester_conn = mock_flight_client.ingester_conn().await;
let partitions = get_partitions(&ingester_conn).await.unwrap();
assert_eq!(partitions.len(), 3);
let p1 = &partitions[0];
assert_eq!(p1.partition_id.get(), 1);
assert_eq!(
p1.parquet_max_sequence_number,
Some(SequenceNumber::new(11))
);
assert_eq!(
p1.tombstone_max_sequence_number,
Some(SequenceNumber::new(12))
);
assert_eq!(p1.batches.len(), 2);
let p2 = &partitions[1];
assert_eq!(p2.partition_id.get(), 2);
assert_eq!(
p2.parquet_max_sequence_number,
Some(SequenceNumber::new(21))
);
assert_eq!(
p2.tombstone_max_sequence_number,
Some(SequenceNumber::new(22))
);
assert_eq!(p2.batches.len(), 1);
let p3 = &partitions[2];
assert_eq!(p3.partition_id.get(), 3);
assert_eq!(
p3.parquet_max_sequence_number,
Some(SequenceNumber::new(31))
);
assert_eq!(
p3.tombstone_max_sequence_number,
Some(SequenceNumber::new(32))
);
assert_eq!(p3.batches.len(), 1);
}
async fn get_partitions(
ingester_conn: &IngesterConnectionImpl,
) -> Result<Vec<Arc<IngesterPartition>>, Error> {
let namespace = Arc::from("namespace");
let table = Arc::from("table");
let columns = vec![String::from("col")];
let schema = Arc::new(SchemaBuilder::new().build().unwrap());
let schema = schema();
ingester_conn
.partitions(namespace, table, columns, &Predicate::default(), schema)
.await
}
fn schema() -> Arc<Schema> {
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
)
}
fn lp_to_record_batch(lp: &str) -> RecordBatch {
lp_to_mutable_batch(lp).1.to_arrow(Selection::All).unwrap()
}
#[derive(Debug)]
struct MockQueryData {
results: Vec<Result<RecordBatch, FlightError>>,
app_metadata: IngesterQueryResponseMetadata,
schema: Arc<arrow::datatypes::Schema>,
}
#[async_trait]
impl QueryData for MockQueryData {
async fn next(&mut self) -> Result<Option<RecordBatch>, FlightError> {
if self.results.is_empty() {
Ok(None)
} else {
self.results.remove(0).map(Some)
}
}
fn app_metadata(&self) -> &IngesterQueryResponseMetadata {
&self.app_metadata
}
fn schema(&self) -> Arc<arrow::datatypes::Schema> {
Arc::clone(&self.schema)
}
}
#[derive(Debug)]
struct MockFlightClient {
responses: Mutex<
HashMap<String, Result<PerformQuery<IngesterQueryResponseMetadata>, FlightClientError>>,
>,
responses: Mutex<HashMap<String, Result<MockQueryData, FlightClientError>>>,
}
impl MockFlightClient {
@ -461,20 +815,10 @@ mod tests {
}
}
impl<const N: usize>
From<
[(
&'static str,
Result<PerformQuery<IngesterQueryResponseMetadata>, FlightClientError>,
); N],
> for MockFlightClient
impl<const N: usize> From<[(&'static str, Result<MockQueryData, FlightClientError>); N]>
for MockFlightClient
{
fn from(
responses: [(
&'static str,
Result<PerformQuery<IngesterQueryResponseMetadata>, FlightClientError>,
); N],
) -> Self {
fn from(responses: [(&'static str, Result<MockQueryData, FlightClientError>); N]) -> Self {
Self {
responses: Mutex::new(
responses
@ -492,12 +836,13 @@ mod tests {
&self,
ingester_address: &str,
_request: IngesterQueryRequest,
) -> Result<PerformQuery<IngesterQueryResponseMetadata>, FlightClientError> {
) -> Result<Box<dyn QueryData>, FlightClientError> {
self.responses
.lock()
.await
.remove(ingester_address)
.expect("Response not mocked")
.map(|query_data| Box::new(query_data) as _)
}
}
@ -514,13 +859,16 @@ mod tests {
for case in cases {
let parquet_max_sequence_number = None;
let tombstone_max_sequence_number = None;
// Construct a partition and ensure it doesn't error
let ingester_partition = IngesterPartition::try_new(
ChunkId::new(),
"ns".into(),
"table".into(),
PartitionId::new(1),
Arc::clone(&expected_schema),
parquet_max_sequence_number,
tombstone_max_sequence_number,
vec![case],
)
.unwrap();