refactor: gather column ranges after decoding (#8090)

We need to decode the ingester data in a serial fashion (since it is a
data stream). Cache access during that phase is costly since we cannot
parallize that. To avoid that, we gather the column ranges AFTER
decoding and calculate the chunk statistics accordingly.

This refactoring also removes the partition sort key from ingester
partitions since they are not required anymore. They are a leftover of
the old physical query planning. They were not marked as "unused" since
they were used by some test code.

Required for #8089.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-06-27 16:44:06 +02:00 committed by GitHub
parent d9ce92dad1
commit 9d8b620cd2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 99 additions and 122 deletions

View File

@ -9,18 +9,16 @@ use self::{
use crate::{
cache::{namespace::CachedTable, CatalogCache},
df_stats::{create_chunk_statistics, ColumnRanges},
CONCURRENT_CHUNK_CREATION_JOBS,
};
use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
use arrow_flight::decode::DecodedPayload;
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig, BackoffError};
use client_util::connection;
use data_types::{
ChunkId, ChunkOrder, DeletePredicate, NamespaceId, PartitionHashId, PartitionId,
TimestampMinMax,
};
use data_types::{ChunkId, ChunkOrder, DeletePredicate, NamespaceId, PartitionHashId, PartitionId};
use datafusion::{error::DataFusionError, physical_plan::Statistics};
use futures::{stream::FuturesUnordered, TryStreamExt};
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use ingester_query_grpc::{
encode_proto_predicate_as_base64, influxdata::iox::ingester::v1::IngesterQueryResponseMetadata,
IngesterQueryRequest,
@ -34,6 +32,7 @@ use iox_time::{Time, TimeProvider};
use metric::{DurationHistogram, Metric};
use observability_deps::tracing::{debug, trace, warn};
use predicate::Predicate;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use schema::{sort::SortKey, Projection, Schema};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
@ -496,10 +495,10 @@ async fn execute(
span_recorder.child_span("IngesterStreamDecoder"),
);
for (msg, md) in messages {
decoder.register(msg, md).await?;
decoder.register(msg, md)?;
}
decoder.finalize()
decoder.finalize().await
}
/// Helper to disassemble the data from the ingester Apache Flight arrow stream.
@ -564,7 +563,7 @@ impl IngesterStreamDecoder {
}
/// Register a new message and its metadata from the Flight stream.
async fn register(
fn register(
&mut self,
msg: DecodedPayload,
md: IngesterQueryResponseMetadata,
@ -589,25 +588,6 @@ impl IngesterStreamDecoder {
},
);
// Use a temporary empty partition sort key. We are going to fetch this AFTER we
// know all chunks because then we are able to detect all relevant primary key
// columns that the sort key must cover.
let partition_sort_key = None;
// If the partition does NOT yet exist within the catalog, this is OK. We can deal without the ranges,
// the chunk pruning will not be as efficient though.
let partition_column_ranges = self
.catalog_cache
.partition()
.column_ranges(
Arc::clone(&self.cached_table),
partition_id,
self.span_recorder
.child_span("cache GET partition column ranges"),
)
.await
.unwrap_or_default();
let ingester_uuid =
Uuid::parse_str(&md.ingester_uuid).context(IngesterUuidSnafu {
ingester_uuid: md.ingester_uuid,
@ -618,8 +598,6 @@ impl IngesterStreamDecoder {
partition_id,
partition_hash_id,
md.completed_persistence_count,
partition_sort_key,
partition_column_ranges,
);
self.current_partition = Some(partition);
}
@ -659,20 +637,47 @@ impl IngesterStreamDecoder {
}
/// Flush internal state and return sorted set of partitions.
fn finalize(mut self) -> Result<Vec<IngesterPartition>> {
async fn finalize(mut self) -> Result<Vec<IngesterPartition>> {
self.flush_partition()?;
let mut ids: Vec<_> = self.finished_partitions.keys().copied().collect();
ids.sort();
let mut partitions = self.finished_partitions.into_values().collect::<Vec<_>>();
let partitions = ids
.into_iter()
.map(|id| {
self.finished_partitions
.remove(&id)
.expect("just got key from this map")
// shuffle order to even catalog load, because cache hits/misses might be correlated w/ the order of the
// partitions.
//
// Note that we sort before shuffling to achieve a deterministic pseudo-random order
let mut rng = StdRng::seed_from_u64(self.cached_table.id.get() as u64);
partitions.sort_by_key(|p| p.partition_id);
partitions.shuffle(&mut rng);
let mut partitions = futures::stream::iter(partitions)
.map(|p| {
let cached_table = &self.cached_table;
let catalog_cache = &self.catalog_cache;
let span = self
.span_recorder
.child_span("fetch column ranges for partition");
async move {
// If the partition does NOT yet exist within the catalog, this is OK. We can deal without the ranges,
// the chunk pruning will not be as efficient though.
let ranges = catalog_cache
.partition()
.column_ranges(Arc::clone(cached_table), p.partition_id, span)
.await
.unwrap_or_default();
(p, ranges)
}
})
.collect();
.buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS)
.map(|(mut p, ranges)| {
p.set_partition_column_ranges(ranges);
p
})
.collect::<Vec<_>>()
.await;
// deterministic order
partitions.sort_by_key(|p| p.partition_id);
self.span_recorder.ok("finished");
Ok(partitions)
}
@ -822,12 +827,6 @@ pub struct IngesterPartition {
/// The number of Parquet files this ingester UUID has persisted for this partition.
completed_persistence_count: u64,
/// Partition-wide sort key.
partition_sort_key: Option<Arc<SortKey>>,
/// Partition-wide column ranges.
partition_column_ranges: ColumnRanges,
chunks: Vec<IngesterChunk>,
}
@ -839,16 +838,12 @@ impl IngesterPartition {
partition_id: PartitionId,
partition_hash_id: Option<PartitionHashId>,
completed_persistence_count: u64,
partition_sort_key: Option<Arc<SortKey>>,
partition_column_ranges: ColumnRanges,
) -> Self {
Self {
ingester_uuid,
partition_id,
partition_hash_id,
completed_persistence_count,
partition_sort_key,
partition_column_ranges,
chunks: vec![],
}
}
@ -876,25 +871,12 @@ impl IngesterPartition {
.map(|batch| ensure_schema(batch, &expected_schema))
.collect::<Result<Vec<RecordBatch>>>()?;
// TODO: may want to ask the Ingester to send this value instead of computing it here.
let ts_min_max = compute_timenanosecond_min_max(&batches).expect("Should have time range");
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>() as u64;
let stats = Arc::new(create_chunk_statistics(
row_count,
&expected_schema,
ts_min_max,
&self.partition_column_ranges,
));
let chunk = IngesterChunk {
chunk_id,
partition_id: self.partition_id,
schema: expected_schema,
partition_sort_key: self.partition_sort_key.clone(),
batches,
ts_min_max,
stats,
stats: None,
delete_predicates: vec![],
};
@ -903,6 +885,27 @@ impl IngesterPartition {
Ok(self)
}
pub(crate) fn set_partition_column_ranges(&mut self, partition_column_ranges: ColumnRanges) {
for chunk in &mut self.chunks {
// TODO: may want to ask the Ingester to send this value instead of computing it here.
let ts_min_max =
compute_timenanosecond_min_max(&chunk.batches).expect("Should have time range");
let row_count = chunk
.batches
.iter()
.map(|batch| batch.num_rows())
.sum::<usize>() as u64;
let stats = Arc::new(create_chunk_statistics(
row_count,
&chunk.schema,
ts_min_max,
&partition_column_ranges,
));
chunk.stats = Some(stats);
}
}
pub(crate) fn ingester_uuid(&self) -> Uuid {
self.ingester_uuid
}
@ -943,17 +946,13 @@ pub struct IngesterChunk {
partition_id: PartitionId,
schema: Schema,
/// Partition-wide sort key.
partition_sort_key: Option<Arc<SortKey>>,
/// The raw table data
batches: Vec<RecordBatch>,
/// Timestamp-specific stats
ts_min_max: TimestampMinMax,
/// Summary Statistics
stats: Arc<Statistics>,
///
/// Set to `None` if not calculated yet.
stats: Option<Arc<Statistics>>,
delete_predicates: Vec<Arc<DeletePredicate>>,
}
@ -982,7 +981,7 @@ impl IngesterChunk {
impl QueryChunkMeta for IngesterChunk {
fn stats(&self) -> Arc<Statistics> {
Arc::clone(&self.stats)
Arc::clone(self.stats.as_ref().expect("chunk stats set"))
}
fn schema(&self) -> &Schema {
@ -1854,8 +1853,6 @@ mod tests {
&PartitionKey::from("arbitrary"),
)),
0,
None,
Default::default(),
)
.try_add_chunk(ChunkId::new(), expected_schema.clone(), vec![case])
.unwrap();
@ -1886,8 +1883,6 @@ mod tests {
&PartitionKey::from("arbitrary"),
)),
0,
None,
Default::default(),
)
.try_add_chunk(ChunkId::new(), expected_schema, vec![batch])
.unwrap_err();

View File

@ -1,5 +1,5 @@
use super::IngesterConnection;
use crate::{cache::namespace::CachedTable, df_stats::create_chunk_statistics};
use crate::cache::namespace::CachedTable;
use async_trait::async_trait;
use data_types::NamespaceId;
use parking_lot::Mutex;
@ -67,44 +67,25 @@ impl IngesterConnection for MockIngesterConnection {
let partitions = partitions
.into_iter()
.map(|mut p| async move {
let column_ranges = Arc::clone(&p.partition_column_ranges);
let chunks = p
.chunks
.into_iter()
.map(|ic| {
let column_ranges = Arc::clone(&column_ranges);
async move {
let batches: Vec<_> = ic
.batches
.iter()
.map(|batch| match ic.schema.df_projection(selection).unwrap() {
Some(projection) => batch.project(&projection).unwrap(),
None => batch.clone(),
})
.collect();
.map(|ic| async move {
let batches: Vec<_> = ic
.batches
.iter()
.map(|batch| match ic.schema.df_projection(selection).unwrap() {
Some(projection) => batch.project(&projection).unwrap(),
None => batch.clone(),
})
.collect();
assert!(!batches.is_empty(), "Error: empty batches");
let new_schema = IOxSchema::try_from(batches[0].schema()).unwrap();
let total_row_count =
batches.iter().map(|b| b.num_rows()).sum::<usize>() as u64;
let stats = create_chunk_statistics(
total_row_count,
&new_schema,
ic.ts_min_max,
&column_ranges,
);
super::IngesterChunk {
chunk_id: ic.chunk_id,
partition_id: ic.partition_id,
schema: new_schema,
partition_sort_key: ic.partition_sort_key,
batches,
ts_min_max: ic.ts_min_max,
stats: Arc::new(stats),
delete_predicates: vec![],
}
assert!(!batches.is_empty(), "Error: empty batches");
let schema = IOxSchema::try_from(batches[0].schema()).unwrap();
super::IngesterChunk {
batches,
schema,
..ic
}
})
.collect::<Vec<_>>();

View File

@ -27,6 +27,11 @@ mod server;
mod system_tables;
mod table;
/// Number of concurrent chunk creation jobs.
///
/// This is mostly to fetch per-partition data concurrently.
const CONCURRENT_CHUNK_CREATION_JOBS: usize = 100;
pub use cache::CatalogCache as QuerierCatalogCache;
pub use database::{Error as QuerierDatabaseError, QuerierDatabase};
pub use ingester::{

View File

@ -20,15 +20,11 @@ use crate::{
df_stats::{create_chunk_statistics, ColumnRanges},
parquet::QuerierParquetChunkMeta,
table::MetricPruningObserver,
CONCURRENT_CHUNK_CREATION_JOBS,
};
use super::QuerierParquetChunk;
/// Number of concurrent chunk creation jobs.
///
/// This is mostly to fetch per-partition data concurrently.
const CONCURRENT_CHUNK_CREATION_JOBS: usize = 100;
/// Adapter that can create chunks.
#[derive(Debug)]
pub struct ChunkAdapter {

View File

@ -8,7 +8,7 @@ use data_types::ChunkId;
use iox_catalog::interface::{get_schema_by_name, SoftDeletedRows};
use iox_tests::{TestCatalog, TestPartition, TestTable};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::{sort::SortKey, Projection, Schema};
use schema::{Projection, Schema};
use std::{sync::Arc, time::Duration};
use tokio::runtime::Handle;
use uuid::Uuid;
@ -67,7 +67,6 @@ pub(crate) struct IngesterPartitionBuilder {
partition: Arc<TestPartition>,
ingester_chunk_id: u128,
partition_sort_key: Option<Arc<SortKey>>,
partition_column_ranges: ColumnRanges,
/// Data returned from the partition, in line protocol format
@ -79,7 +78,6 @@ impl IngesterPartitionBuilder {
Self {
schema,
partition: Arc::clone(partition),
partition_sort_key: None,
partition_column_ranges: Default::default(),
ingester_chunk_id: 1,
lp: Vec::new(),
@ -103,19 +101,21 @@ impl IngesterPartitionBuilder {
pub(crate) fn build(&self) -> IngesterPartition {
let data = self.lp.iter().map(|lp| lp_to_record_batch(lp)).collect();
IngesterPartition::new(
let mut part = IngesterPartition::new(
Uuid::new_v4(),
self.partition.partition.id,
self.partition.partition.hash_id().cloned(),
0,
self.partition_sort_key.clone(),
Arc::clone(&self.partition_column_ranges),
)
.try_add_chunk(
ChunkId::new_test(self.ingester_chunk_id),
self.schema.clone(),
data,
)
.unwrap()
.unwrap();
part.set_partition_column_ranges(Arc::clone(&self.partition_column_ranges));
part
}
}