test: correctly decode ingester reponses in end2end tests

pull/24376/head
Marco Neumann 2023-07-03 12:15:43 +02:00 committed by Dom Dwyer
parent b1a4e3955e
commit 70b44f78ee
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
4 changed files with 102 additions and 46 deletions

View File

@ -1323,10 +1323,15 @@ async fn assert_ingester_contains_results(
.await
.unwrap();
let ingester_uuid = ingester_response.app_metadata.ingester_uuid;
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("at least one ingester partition");
let ingester_uuid = ingester_partition.app_metadata.ingester_uuid;
assert!(!ingester_uuid.is_empty());
assert_batches_sorted_eq!(expected, &ingester_response.record_batches);
assert_batches_sorted_eq!(expected, &ingester_partition.record_batches);
}
#[tokio::test]

View File

@ -45,7 +45,14 @@ async fn persist_on_demand() {
.await
.unwrap();
let ingester_uuid = ingester_response.app_metadata.ingester_uuid;
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
let ingester_uuid = ingester_partition.app_metadata.ingester_uuid;
assert!(!ingester_uuid.is_empty());
let expected = [
@ -55,7 +62,7 @@ async fn persist_on_demand() {
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches);
}
.boxed()
})),
@ -83,8 +90,15 @@ async fn persist_on_demand() {
.await
.unwrap();
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
let num_files_persisted =
ingester_response.app_metadata.completed_persistence_count;
ingester_partition.app_metadata.completed_persistence_count;
assert_eq!(num_files_persisted, 1);
}
.boxed()
@ -127,11 +141,17 @@ async fn ingester_flight_api() {
.query_ingester(query.clone(), cluster.ingester().ingester_grpc_connection())
.await
.unwrap();
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
let ingester_uuid = ingester_response.app_metadata.ingester_uuid.clone();
let ingester_uuid = ingester_partition.app_metadata.ingester_uuid.clone();
assert!(!ingester_uuid.is_empty());
let schema = ingester_response.schema.unwrap();
let schema = ingester_partition.schema.unwrap();
let expected = [
"+------+------+--------------------------------+-----+",
@ -141,11 +161,11 @@ async fn ingester_flight_api() {
"| B | A | 1970-01-01T00:00:00.001234567Z | 84 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches);
// Also ensure that the schema of the batches matches what is
// reported by the performed_query.
ingester_response
ingester_partition
.record_batches
.iter()
.enumerate()
@ -158,7 +178,13 @@ async fn ingester_flight_api() {
.query_ingester(query.clone(), cluster.ingester().ingester_grpc_connection())
.await
.unwrap();
assert_eq!(ingester_response.app_metadata.ingester_uuid, ingester_uuid);
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
assert_eq!(ingester_partition.app_metadata.ingester_uuid, ingester_uuid);
// Restart the ingesters
cluster.restart_ingesters().await;
@ -173,7 +199,13 @@ async fn ingester_flight_api() {
.query_ingester(query, cluster.ingester().ingester_grpc_connection())
.await
.unwrap();
assert_ne!(ingester_response.app_metadata.ingester_uuid, ingester_uuid);
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
assert_ne!(ingester_partition.app_metadata.ingester_uuid, ingester_uuid);
}
#[tokio::test]
@ -296,7 +328,12 @@ async fn ingester_partition_pruning() {
"| 1.0 | v1a | v2b | v3b | 1970-01-01T00:00:00.000000011Z |",
"+-----+------+------+------+--------------------------------+",
];
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
let record_batches = ingester_response
.partitions
.into_iter()
.flat_map(|p| p.record_batches)
.collect::<Vec<_>>();
assert_batches_sorted_eq!(&expected, &record_batches);
}
.boxed()
})));

View File

@ -193,7 +193,14 @@ async fn write_replication() {
.await
.unwrap();
let ingester_uuid = ingester_response.app_metadata.ingester_uuid;
assert_eq!(ingester_response.partitions.len(), 1);
let ingester_partition = ingester_response
.partitions
.into_iter()
.next()
.expect("just checked len");
let ingester_uuid = ingester_partition.app_metadata.ingester_uuid;
assert!(!ingester_uuid.is_empty());
let expected = [
@ -212,7 +219,7 @@ async fn write_replication() {
"| A | B | 1970-01-01T00:00:00.000000020Z | 20 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
assert_batches_sorted_eq!(&expected, &ingester_partition.record_batches);
}
.boxed()
})));

View File

@ -481,27 +481,43 @@ impl MiniCluster {
.await?
.into_inner();
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
assert!(matches!(msg, DecodedPayload::None), "{msg:?}");
let schema = next_message(&mut performed_query)
.await
.map(|(msg, _)| unwrap_schema(msg));
let mut record_batches = vec![];
while let Some((msg, _md)) = next_message(&mut performed_query).await {
if matches!(msg, DecodedPayload::None | DecodedPayload::Schema(_)) {
continue;
let mut partitions = vec![];
let mut current_partition = None;
while let Some((msg, app_metadata)) = next_message(&mut performed_query).await {
match msg {
DecodedPayload::None => {
if let Some(p) = std::mem::take(&mut current_partition) {
partitions.push(p);
}
let batch = unwrap_record_batch(msg);
record_batches.push(batch);
}
Ok(IngesterResponse {
current_partition = Some(IngesterResponsePartition {
app_metadata,
schema,
record_batches,
})
schema: None,
record_batches: vec![],
});
}
DecodedPayload::Schema(schema) => {
let current_partition =
current_partition.as_mut().expect("schema w/o partition");
assert!(
current_partition.schema.is_none(),
"got two schemas for a single partition"
);
current_partition.schema = Some(schema);
}
DecodedPayload::RecordBatch(batch) => {
let current_partition =
current_partition.as_mut().expect("batch w/o partition");
assert!(current_partition.schema.is_some(), "batch w/o schema");
current_partition.record_batches.push(batch);
}
}
}
if let Some(p) = current_partition {
partitions.push(p);
}
Ok(IngesterResponse { partitions })
}
/// Ask all of the ingesters to persist their data.
@ -592,6 +608,11 @@ impl MiniCluster {
/// Gathers data from ingester Flight queries
#[derive(Debug)]
pub struct IngesterResponse {
pub partitions: Vec<IngesterResponsePartition>,
}
#[derive(Debug)]
pub struct IngesterResponsePartition {
pub app_metadata: IngesterQueryResponseMetadata,
pub schema: Option<SchemaRef>,
pub record_batches: Vec<RecordBatch>,
@ -711,17 +732,3 @@ async fn next_message(
Some((payload, app_metadata))
}
fn unwrap_schema(msg: DecodedPayload) -> SchemaRef {
match msg {
DecodedPayload::Schema(s) => s,
_ => panic!("Unexpected message type: {msg:?}"),
}
}
fn unwrap_record_batch(msg: DecodedPayload) -> RecordBatch {
match msg {
DecodedPayload::RecordBatch(b) => b,
_ => panic!("Unexpected message type: {msg:?}"),
}
}