perf: only send metadata for relevant partitions
When partition pruning is possible, it skips sending the data for partitions that have no affect on the query outcome. This commit does the same for the partition metadata - these frames can form a significant portion of the query response when the row count is low, and for pruned partitions have no bearing on the query result.pull/24376/head
parent
5232cfea1d
commit
7bd6e90830
|
@ -745,6 +745,108 @@ mod tests {
|
|||
]
|
||||
);
|
||||
|
||||
/// Ensure partition pruning during query execution also prunes metadata
|
||||
/// frames.
|
||||
///
|
||||
/// Individual frames are fast to serialise, but large numbers of frames can
|
||||
/// add significant query overhead, particularly for queries returning small
|
||||
/// numbers of rows where the metadata becomes a significant portion of the
|
||||
/// response.
|
||||
#[tokio::test]
|
||||
async fn test_partition_metadata_pruning() {
|
||||
let partition_provider = Arc::new(
|
||||
MockPartitionProvider::default()
|
||||
.with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(ARBITRARY_PARTITION_ID)
|
||||
.with_partition_key("madrid".into())
|
||||
.build(),
|
||||
)
|
||||
.with_partition(
|
||||
PartitionDataBuilder::new()
|
||||
.with_partition_id(PARTITION2_ID)
|
||||
.with_partition_key("asturias".into())
|
||||
.build(),
|
||||
),
|
||||
);
|
||||
|
||||
// Construct a partition template suitable for pruning on the "region"
|
||||
// tag.
|
||||
let table_provider = Arc::new(MockTableProvider::new(TableMetadata::new_for_testing(
|
||||
ARBITRARY_TABLE_NAME.clone(),
|
||||
test_table_partition_override(vec![TemplatePart::TagValue("region")]),
|
||||
)));
|
||||
|
||||
// Init the buffer tree
|
||||
let buf = BufferTree::new(
|
||||
Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)),
|
||||
table_provider,
|
||||
partition_provider,
|
||||
Arc::new(MockPostWriteObserver::default()),
|
||||
Arc::new(metric::Registry::default()),
|
||||
);
|
||||
|
||||
// Write to two regions
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("madrid"),
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
0,
|
||||
&format!(
|
||||
r#"{},region=madrid temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to perform write");
|
||||
|
||||
buf.apply(IngestOp::Write(make_write_op(
|
||||
&PartitionKey::from("asturias"),
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
&ARBITRARY_TABLE_NAME,
|
||||
ARBITRARY_TABLE_ID,
|
||||
0,
|
||||
&format!(
|
||||
r#"{},region=asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to perform write");
|
||||
|
||||
// Construct a predicate suitable for pruning partitions based on the
|
||||
// region / partition template.
|
||||
let predicate = Some(Predicate::new().with_expr(col("region").eq(lit(
|
||||
ScalarValue::Dictionary(
|
||||
Box::new(DataType::Int32),
|
||||
Box::new(ScalarValue::from("asturias")),
|
||||
),
|
||||
))));
|
||||
|
||||
// Execute the query and count the number of partitions that are
|
||||
// returned (either data, or metadata).
|
||||
let partition_count = buf
|
||||
.query_exec(
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
ARBITRARY_TABLE_ID,
|
||||
OwnedProjection::default(),
|
||||
None,
|
||||
predicate,
|
||||
)
|
||||
.await
|
||||
.expect("query should succeed")
|
||||
.into_partition_stream()
|
||||
.count()
|
||||
.await;
|
||||
|
||||
// Because the data in the "madrid" partition was pruned out, the
|
||||
// metadata should not be sent either.
|
||||
assert_eq!(partition_count, 1);
|
||||
}
|
||||
|
||||
/// Assert that multiple writes to a single namespace/table results in a
|
||||
/// single namespace being created, and matching metrics.
|
||||
#[tokio::test]
|
||||
|
|
|
@ -270,7 +270,7 @@ where
|
|||
|
||||
// Gather the partition data from all of the partitions in this table.
|
||||
let span = SpanRecorder::new(span);
|
||||
let partitions = self.partitions().into_iter().map(move |p| {
|
||||
let partitions = self.partitions().into_iter().filter_map(move |p| {
|
||||
let mut span = span.child("partition read");
|
||||
|
||||
let (id, hash_id, completed_persistence_count, data, partition_key) = {
|
||||
|
@ -303,15 +303,24 @@ where
|
|||
})
|
||||
.unwrap_or_default()
|
||||
{
|
||||
return PartitionResponse::new(
|
||||
vec![],
|
||||
id,
|
||||
hash_id,
|
||||
completed_persistence_count,
|
||||
);
|
||||
// This partition will never contain any data that would
|
||||
// form part of the query response.
|
||||
//
|
||||
// Because this is true of buffered data, it is also
|
||||
// true of the persisted data, and therefore sending the
|
||||
// persisted file count metadata is useless because the
|
||||
// querier would never utilise the persisted files as
|
||||
// part of this query.
|
||||
//
|
||||
// This avoids sending O(n) metadata frames for queries
|
||||
// that may only touch one or two actual frames. The N
|
||||
// partition count grows over the lifetime of the
|
||||
// ingester as more partitions are created, and while
|
||||
// fast to serialise individually, the sequentially-sent
|
||||
// N metadata frames add up.
|
||||
return None;
|
||||
}
|
||||
|
||||
// Project the data if necessary
|
||||
PartitionResponse::new(
|
||||
data.into_record_batches(),
|
||||
id,
|
||||
|
@ -323,7 +332,7 @@ where
|
|||
};
|
||||
|
||||
span.ok("read partition data");
|
||||
ret
|
||||
Some(ret)
|
||||
});
|
||||
|
||||
Ok(PartitionStream::new(futures::stream::iter(partitions)))
|
||||
|
|
Loading…
Reference in New Issue