feat: intern schemas during query planning (#5215)

* feat: intern schemas during query planning

Helps with #5202.

* refactor: `SchemaMerger::build` shall return an `Arc`

* feat: `SchemaMerger::with_interner`

* refactor: hash-based schema interning
pull/24376/head
Marco Neumann 2022-08-11 12:28:51 +00:00 committed by GitHub
parent 4867c6b682
commit 90fec1365f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 208 additions and 40 deletions

View File

@ -88,7 +88,7 @@ impl QueryableParquetChunk {
for chunk in chunks {
merger = merger.merge(&chunk.schema()).expect("schemas compatible");
}
Arc::new(merger.build())
merger.build()
}
/// Return max sequence number

View File

@ -226,6 +226,6 @@ mod test {
.unwrap()
.build();
(Arc::new(schema), vec![chunk1, chunk2])
(schema, vec![chunk1, chunk2])
}
}

View File

@ -261,7 +261,7 @@ mod test {
.unwrap()
.build();
(Arc::new(schema), vec![chunk1, chunk2])
(schema, vec![chunk1, chunk2])
}
#[tokio::test]

View File

@ -22,7 +22,9 @@ use datafusion::{
};
use observability_deps::tracing::{debug, trace, warn};
use predicate::Predicate;
use schema::{merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema};
use schema::{
interner::SchemaInterner, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema,
};
use crate::{
compute_sort_key_for_chunks,
@ -323,6 +325,9 @@ pub(crate) struct Deduplicater {
/// a vector of non-overlapped and non-duplicates chunks
pub no_duplicates_chunks: Vec<Arc<dyn QueryChunk>>,
/// schema interner
schema_interner: SchemaInterner,
// execution context
ctx: IOxSessionContext,
}
@ -333,6 +338,7 @@ impl Deduplicater {
overlapped_chunks_set: vec![],
in_chunk_duplicates_chunks: vec![],
no_duplicates_chunks: vec![],
schema_interner: Default::default(),
ctx,
}
}
@ -435,10 +441,11 @@ impl Deduplicater {
chunks,
predicate,
output_sort_key.as_ref(),
&mut self.schema_interner,
)?;
plans.append(&mut non_duplicate_plans);
} else {
let pk_schema = Self::compute_pk_schema(&chunks);
let pk_schema = Self::compute_pk_schema(&chunks, &mut self.schema_interner);
debug!(overlapped_chunks=?self.overlapped_chunks_set.len(),
in_chunk_duplicates=?self.in_chunk_duplicates_chunks.len(),
no_duplicates_chunks=?self.no_duplicates_chunks.len(),
@ -495,6 +502,7 @@ impl Deduplicater {
overlapped_chunks,
predicate.clone(),
&chunks_dedup_sort_key,
&mut self.schema_interner,
)?);
}
@ -524,6 +532,7 @@ impl Deduplicater {
chunk_with_duplicates,
predicate.clone(),
&chunk_dedup_sort_key,
&mut self.schema_interner,
)?);
}
@ -542,6 +551,7 @@ impl Deduplicater {
self.no_duplicates_chunks.to_vec(),
predicate,
output_sort_key.as_ref(),
&mut self.schema_interner,
)?;
plans.append(&mut non_duplicate_plans);
}
@ -761,6 +771,7 @@ impl Deduplicater {
chunks: Vec<Arc<dyn QueryChunk>>, // These chunks are identified overlapped
predicate: Predicate,
output_sort_key: &SortKey,
schema_interner: &mut SchemaInterner,
) -> Result<Arc<dyn ExecutionPlan>> {
// Note that we may need to sort/deduplicate based on tag
// columns which do not appear in the output
@ -773,8 +784,8 @@ impl Deduplicater {
chunks
};
let pk_schema = Self::compute_pk_schema(&chunks);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema);
let pk_schema = Self::compute_pk_schema(&chunks, schema_interner);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema, schema_interner);
debug!(
?output_schema,
@ -794,6 +805,7 @@ impl Deduplicater {
Arc::clone(chunk),
predicate.clone(),
Some(output_sort_key),
schema_interner,
)
})
.collect();
@ -850,9 +862,10 @@ impl Deduplicater {
chunk: Arc<dyn QueryChunk>, // This chunk is identified having duplicates
predicate: Predicate,
output_sort_key: &SortKey,
schema_interner: &mut SchemaInterner,
) -> Result<Arc<dyn ExecutionPlan>> {
let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema);
let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)], schema_interner);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema, schema_interner);
debug!(
?output_schema,
@ -872,6 +885,7 @@ impl Deduplicater {
Arc::clone(&chunks[0]),
predicate,
Some(output_sort_key),
schema_interner,
)?;
// Add DeduplicateExec
@ -987,6 +1001,7 @@ impl Deduplicater {
chunk: Arc<dyn QueryChunk>,
predicate: Predicate, // This is the select predicate of the query
output_sort_key: Option<&SortKey>,
schema_interner: &mut SchemaInterner,
) -> Result<Arc<dyn ExecutionPlan>> {
// Add columns of sort key and delete predicates in the schema of to-be-scanned IOxReadFilterNode
// This is needed because columns in select query may not include them yet
@ -1004,7 +1019,10 @@ impl Deduplicater {
// 2. ensures that all columns necessary to perform the sort are present
// 3. ensures that all columns necessary to evaluate the delete predicates are present
trace!("Build sort plan for a single chunk. Sort node won't be added if the plan is already sorted");
let mut schema_merger = SchemaMerger::new().merge(&output_schema).unwrap();
let mut schema_merger = SchemaMerger::new()
.with_interner(schema_interner)
.merge(&output_schema)
.unwrap();
let chunk_schema = chunk.schema();
trace!(?chunk_schema, "chunk schema");
@ -1036,7 +1054,7 @@ impl Deduplicater {
let mut input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
ctx,
Arc::clone(&table_name),
Arc::new(input_schema),
input_schema,
vec![Arc::clone(&chunk)],
predicate,
));
@ -1147,6 +1165,7 @@ impl Deduplicater {
chunk: Arc<dyn QueryChunk>, // This chunk is identified having no duplicates
predicate: Predicate,
output_sort_key: Option<&SortKey>,
schema_interner: &mut SchemaInterner,
) -> Result<Arc<dyn ExecutionPlan>> {
Self::build_sort_plan_for_read_filter(
ctx,
@ -1155,6 +1174,7 @@ impl Deduplicater {
chunk,
predicate,
output_sort_key,
schema_interner,
)
}
@ -1196,6 +1216,7 @@ impl Deduplicater {
chunks: Vec<Arc<dyn QueryChunk>>, // These chunks is identified having no duplicates
predicate: Predicate,
output_sort_key: Option<&SortKey>,
schema_interner: &mut SchemaInterner,
) -> Result<Vec<Arc<dyn ExecutionPlan>>> {
let mut plans: Vec<Arc<dyn ExecutionPlan>> = vec![];
@ -1226,6 +1247,7 @@ impl Deduplicater {
Arc::clone(chunk),
predicate.clone(),
output_sort_key,
schema_interner,
)
})
.collect();
@ -1240,8 +1262,11 @@ impl Deduplicater {
}
/// Find the columns needed in chunks' primary keys across schemas
fn compute_pk_schema(chunks: &[Arc<dyn QueryChunk>]) -> Arc<Schema> {
let mut schema_merger = SchemaMerger::new();
fn compute_pk_schema(
chunks: &[Arc<dyn QueryChunk>],
schema_interner: &mut SchemaInterner,
) -> Arc<Schema> {
let mut schema_merger = SchemaMerger::new().with_interner(schema_interner);
for chunk in chunks {
let chunk_schema = chunk.schema();
for (column_type, field) in chunk_schema.iter() {
@ -1256,19 +1281,23 @@ impl Deduplicater {
}
}
Arc::new(schema_merger.build())
schema_merger.build()
}
/// Find columns required to read from each scan: the output columns + the
/// primary key columns
fn compute_input_schema(output_schema: &Schema, pk_schema: &Schema) -> Arc<Schema> {
let input_schema = SchemaMerger::new()
fn compute_input_schema(
output_schema: &Schema,
pk_schema: &Schema,
schema_interner: &mut SchemaInterner,
) -> Arc<Schema> {
SchemaMerger::new()
.with_interner(schema_interner)
.merge(output_schema)
.unwrap()
.merge(pk_schema)
.unwrap()
.build();
Arc::new(input_schema)
.build()
}
}
@ -1587,6 +1616,7 @@ mod test {
Arc::clone(&chunk),
Predicate::default(),
Some(&sort_key.clone()),
&mut SchemaInterner::default(),
)
.unwrap();
@ -1629,6 +1659,7 @@ mod test {
Arc::clone(&chunk),
Predicate::default(),
Some(&sort_key),
&mut SchemaInterner::default(),
)
.unwrap();
@ -1663,6 +1694,7 @@ mod test {
Arc::clone(&chunk),
Predicate::default(),
&sort_key,
&mut SchemaInterner::default(),
)
.unwrap();
@ -1693,6 +1725,7 @@ mod test {
Arc::clone(&chunk),
Predicate::default(),
&sort_key,
&mut SchemaInterner::default(),
)
.unwrap();
@ -1739,6 +1772,7 @@ mod test {
vec![Arc::clone(&chunk1), Arc::clone(&chunk2)],
Predicate::default(),
None, // not ask to sort the output of the plan
&mut SchemaInterner::default(),
)
.unwrap();
@ -1759,6 +1793,7 @@ mod test {
vec![Arc::clone(&chunk1), Arc::clone(&chunk2)],
Predicate::default(),
Some(&sort_key), // sort output on this sort_key
&mut SchemaInterner::default(),
)
.unwrap();
@ -1924,6 +1959,7 @@ mod test {
chunks,
Predicate::default(),
&output_sort_key,
&mut SchemaInterner::default(),
)
.unwrap();
@ -1984,6 +2020,7 @@ mod test {
vec![chunk1, chunk2],
Predicate::default(),
&output_sort_key,
&mut SchemaInterner::default(),
)
.unwrap();
@ -2059,6 +2096,7 @@ mod test {
chunks,
Predicate::default(),
&output_sort_key,
&mut SchemaInterner::default(),
)
.unwrap();
let batch = test_collect(sort_plan).await;
@ -2153,6 +2191,7 @@ mod test {
chunks,
Predicate::default(),
&output_sort_key,
&mut SchemaInterner::default(),
)
.unwrap();
let batch = test_collect(sort_plan).await;
@ -2253,10 +2292,11 @@ mod test {
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxSessionContext::with_testing(),
Arc::from("t"),
Arc::new(schema),
schema,
chunks,
Predicate::default(),
&output_sort_key,
&mut SchemaInterner::default(),
)
.unwrap();
let batch = test_collect(sort_plan).await;

View File

@ -519,7 +519,7 @@ mod test {
for chunk in chunks {
merger = merger.merge(chunk.schema().as_ref()).unwrap();
}
Arc::new(merger.build())
merger.build()
}
#[test]

View File

@ -150,7 +150,7 @@ impl QueryDatabaseMeta for TestDatabase {
}
}
found_one.then(|| Arc::new(merger.build()))
found_one.then(|| merger.build())
}
fn table_names(&self) -> Vec<String> {
@ -519,7 +519,7 @@ impl TestChunk {
merger = merger
.merge(self.schema.as_ref())
.expect("merging was successful");
self.schema = Arc::new(merger.build());
self.schema = merger.build();
if add_column_summary {
let influxdb_type = col_type.map(|t| match t {

57
schema/src/interner.rs Normal file
View File

@ -0,0 +1,57 @@
use std::{collections::HashSet, sync::Arc};
use crate::Schema;
/// Helper that handles [Interning] for [`Schema`]s.
///
/// Note that this is rather expensive since the interner needs to compare the entire schema, so if you find another
/// key to to store your schema (e.g. a table ID), use a `HashMap<K, Arc<Schema>>` instead.
///
/// [Interning]: https://en.wikipedia.org/wiki/Interning_(computer_science)
#[derive(Debug, Default)]
pub struct SchemaInterner {
schemas: HashSet<Arc<Schema>>,
}
impl SchemaInterner {
/// Create new, empty interner.
pub fn new() -> Self {
Self::default()
}
/// Intern schema.
pub fn intern(&mut self, schema: Schema) -> Arc<Schema> {
if let Some(schema) = self.schemas.get(&schema) {
Arc::clone(schema)
} else {
let schema = Arc::new(schema);
self.schemas.insert(Arc::clone(&schema));
schema
}
}
}
#[cfg(test)]
mod tests {
use crate::builder::SchemaBuilder;
use super::*;
#[test]
fn test() {
let mut interner = SchemaInterner::default();
let schema_1a = SchemaBuilder::new().tag("t1").tag("t2").build().unwrap();
let schema_1b = SchemaBuilder::new().tag("t1").tag("t2").build().unwrap();
let schema_2 = SchemaBuilder::new().tag("t1").tag("t3").build().unwrap();
let interned_1a = interner.intern(schema_1a.clone());
assert_eq!(interned_1a.as_ref(), &schema_1a);
let interned_1b = interner.intern(schema_1b);
assert!(Arc::ptr_eq(&interned_1a, &interned_1b));
let interned_2 = interner.intern(schema_2.clone());
assert_eq!(interned_2.as_ref(), &schema_2);
}
}

View File

@ -39,6 +39,7 @@ pub fn TIME_DATA_TYPE() -> ArrowDataType {
}
pub mod builder;
pub mod interner;
pub mod merge;
pub mod selection;
pub mod sort;
@ -83,7 +84,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Specifically, each column in the Arrow schema has a corresponding
/// InfluxDB data model type of Tag, Field or Timestamp which is stored in
/// the metadata field of the ArrowSchemaRef
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Schema {
/// All the actual data lives on the metadata structure in
/// `ArrowSchemaRef` and this structure knows how to access that

View File

@ -8,6 +8,8 @@ use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use snafu::Snafu;
use crate::interner::SchemaInterner;
use super::{InfluxColumnType, Schema};
/// Database schema creation / validation errors.
@ -81,7 +83,7 @@ pub fn merge_record_batch_schemas(batches: &[Arc<RecordBatch>]) -> Arc<Schema> {
let schema = Schema::try_from(batch.schema()).expect("Schema conversion error");
merger = merger.merge(&schema).expect("Schemas compatible");
}
Arc::new(merger.build())
merger.build()
}
/// Schema Merger
@ -96,18 +98,30 @@ pub fn merge_record_batch_schemas(batches: &[Arc<RecordBatch>]) -> Arc<Schema> {
///
/// 2. The measurement names must be consistent: one or both can be
/// `None`, or they can both be `Some(name`)
#[derive(Debug, Default, Clone)]
pub struct SchemaMerger {
#[derive(Debug, Default)]
pub struct SchemaMerger<'a> {
/// Maps column names to their definition
fields: HashMap<String, (Field, Option<InfluxColumnType>)>,
/// The measurement name if any
measurement: Option<String>,
/// Interner, if any.
interner: Option<&'a mut SchemaInterner>,
}
impl SchemaMerger {
impl SchemaMerger<'static> {
pub fn new() -> Self {
Self::default()
}
}
impl<'a> SchemaMerger<'a> {
pub fn with_interner(self, interner: &mut SchemaInterner) -> SchemaMerger<'_> {
SchemaMerger {
fields: self.fields,
measurement: self.measurement,
interner: Some(interner),
}
}
/// Appends the schema to the merged schema being built,
/// validating that no columns are added.
@ -185,13 +199,19 @@ impl SchemaMerger {
}
/// Returns the schema that was built, the columns are always sorted in lexicographic order
pub fn build(mut self) -> Schema {
Schema::new_from_parts(
pub fn build(mut self) -> Arc<Schema> {
let schema = Schema::new_from_parts(
self.measurement.take(),
self.fields.drain().map(|x| x.1),
true,
)
.expect("failed to build merged schema")
.expect("failed to build merged schema");
if let Some(interner) = self.interner.as_mut() {
interner.intern(schema)
} else {
Arc::new(schema)
}
}
}
@ -223,8 +243,8 @@ mod tests {
.unwrap()
.build();
assert_eq!(merged_schema, schema1);
assert_eq!(merged_schema, schema2);
assert_eq!(merged_schema.as_ref(), &schema1);
assert_eq!(merged_schema.as_ref(), &schema2);
}
#[test]
@ -264,9 +284,11 @@ mod tests {
.sort_fields_by_name();
assert_eq!(
expected_schema, merged_schema,
&expected_schema,
merged_schema.as_ref(),
"\nExpected:\n{:#?}\nActual:\n{:#?}",
expected_schema, merged_schema
expected_schema,
merged_schema
);
}
@ -292,9 +314,11 @@ mod tests {
.unwrap();
assert_eq!(
expected_schema, merged_schema,
&expected_schema,
merged_schema.as_ref(),
"\nExpected:\n{:#?}\nActual:\n{:#?}",
expected_schema, merged_schema
expected_schema,
merged_schema
);
}
@ -324,9 +348,11 @@ mod tests {
.unwrap();
assert_eq!(
expected_schema, merged_schema,
&expected_schema,
merged_schema.as_ref(),
"\nExpected:\n{:#?}\nActual:\n{:#?}",
expected_schema, merged_schema
expected_schema,
merged_schema
);
}
@ -354,9 +380,11 @@ mod tests {
.unwrap();
assert_eq!(
expected_schema, merged_schema,
&expected_schema,
merged_schema.as_ref(),
"\nExpected:\n{:#?}\nActual:\n{:#?}",
expected_schema, merged_schema
expected_schema,
merged_schema
);
}
@ -450,4 +478,46 @@ mod tests {
assert_eq!(merged_schema_error.to_string(), "Schema Merge Error: Incompatible nullability for 'int_field'. Existing field can not be null, new field can be null");
}
#[test]
fn test_interning() {
let schema_1a = SchemaBuilder::new()
.influx_field("int_field", Integer)
.tag("the_tag")
.build()
.unwrap();
let schema_1b = SchemaBuilder::new()
.influx_field("int_field", Integer)
.tag("the_tag")
.build()
.unwrap();
let schema_2 = SchemaBuilder::new()
.influx_field("float_field", crate::InfluxFieldType::Float)
.tag("the_tag")
.build()
.unwrap();
let mut interner = SchemaInterner::new();
let merged_schema_a = SchemaMerger::new()
.with_interner(&mut interner)
.merge(&schema_1a)
.unwrap()
.merge(&schema_2)
.unwrap()
.build();
let merged_schema_b = SchemaMerger::new()
.with_interner(&mut interner)
.merge(&schema_1b)
.unwrap()
.merge(&schema_2)
.unwrap()
.build();
assert_eq!(merged_schema_a, merged_schema_b);
assert!(Arc::ptr_eq(&merged_schema_a, &merged_schema_b));
}
}