diff --git a/compactor/src/query.rs b/compactor/src/query.rs index 285e67ecd3..b1d681ffb1 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -88,7 +88,7 @@ impl QueryableParquetChunk { for chunk in chunks { merger = merger.merge(&chunk.schema()).expect("schemas compatible"); } - Arc::new(merger.build()) + merger.build() } /// Return max sequence number diff --git a/iox_query/src/frontend.rs b/iox_query/src/frontend.rs index 67b6d7f8e6..0e8ad6bd00 100644 --- a/iox_query/src/frontend.rs +++ b/iox_query/src/frontend.rs @@ -226,6 +226,6 @@ mod test { .unwrap() .build(); - (Arc::new(schema), vec![chunk1, chunk2]) + (schema, vec![chunk1, chunk2]) } } diff --git a/iox_query/src/frontend/reorg.rs b/iox_query/src/frontend/reorg.rs index c6f6b23611..e42c08de1e 100644 --- a/iox_query/src/frontend/reorg.rs +++ b/iox_query/src/frontend/reorg.rs @@ -261,7 +261,7 @@ mod test { .unwrap() .build(); - (Arc::new(schema), vec![chunk1, chunk2]) + (schema, vec![chunk1, chunk2]) } #[tokio::test] diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index 4be50d3020..37a5c6502b 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -22,7 +22,9 @@ use datafusion::{ }; use observability_deps::tracing::{debug, trace, warn}; use predicate::Predicate; -use schema::{merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema}; +use schema::{ + interner::SchemaInterner, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema, +}; use crate::{ compute_sort_key_for_chunks, @@ -323,6 +325,9 @@ pub(crate) struct Deduplicater { /// a vector of non-overlapped and non-duplicates chunks pub no_duplicates_chunks: Vec>, + /// schema interner + schema_interner: SchemaInterner, + // execution context ctx: IOxSessionContext, } @@ -333,6 +338,7 @@ impl Deduplicater { overlapped_chunks_set: vec![], in_chunk_duplicates_chunks: vec![], no_duplicates_chunks: vec![], + schema_interner: Default::default(), ctx, } } @@ -435,10 +441,11 @@ impl Deduplicater { chunks, predicate, output_sort_key.as_ref(), + &mut self.schema_interner, )?; plans.append(&mut non_duplicate_plans); } else { - let pk_schema = Self::compute_pk_schema(&chunks); + let pk_schema = Self::compute_pk_schema(&chunks, &mut self.schema_interner); debug!(overlapped_chunks=?self.overlapped_chunks_set.len(), in_chunk_duplicates=?self.in_chunk_duplicates_chunks.len(), no_duplicates_chunks=?self.no_duplicates_chunks.len(), @@ -495,6 +502,7 @@ impl Deduplicater { overlapped_chunks, predicate.clone(), &chunks_dedup_sort_key, + &mut self.schema_interner, )?); } @@ -524,6 +532,7 @@ impl Deduplicater { chunk_with_duplicates, predicate.clone(), &chunk_dedup_sort_key, + &mut self.schema_interner, )?); } @@ -542,6 +551,7 @@ impl Deduplicater { self.no_duplicates_chunks.to_vec(), predicate, output_sort_key.as_ref(), + &mut self.schema_interner, )?; plans.append(&mut non_duplicate_plans); } @@ -761,6 +771,7 @@ impl Deduplicater { chunks: Vec>, // These chunks are identified overlapped predicate: Predicate, output_sort_key: &SortKey, + schema_interner: &mut SchemaInterner, ) -> Result> { // Note that we may need to sort/deduplicate based on tag // columns which do not appear in the output @@ -773,8 +784,8 @@ impl Deduplicater { chunks }; - let pk_schema = Self::compute_pk_schema(&chunks); - let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); + let pk_schema = Self::compute_pk_schema(&chunks, schema_interner); + let input_schema = Self::compute_input_schema(&output_schema, &pk_schema, schema_interner); debug!( ?output_schema, @@ -794,6 +805,7 @@ impl Deduplicater { Arc::clone(chunk), predicate.clone(), Some(output_sort_key), + schema_interner, ) }) .collect(); @@ -850,9 +862,10 @@ impl Deduplicater { chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, output_sort_key: &SortKey, + schema_interner: &mut SchemaInterner, ) -> Result> { - let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); - let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); + let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)], schema_interner); + let input_schema = Self::compute_input_schema(&output_schema, &pk_schema, schema_interner); debug!( ?output_schema, @@ -872,6 +885,7 @@ impl Deduplicater { Arc::clone(&chunks[0]), predicate, Some(output_sort_key), + schema_interner, )?; // Add DeduplicateExec @@ -987,6 +1001,7 @@ impl Deduplicater { chunk: Arc, predicate: Predicate, // This is the select predicate of the query output_sort_key: Option<&SortKey>, + schema_interner: &mut SchemaInterner, ) -> Result> { // Add columns of sort key and delete predicates in the schema of to-be-scanned IOxReadFilterNode // This is needed because columns in select query may not include them yet @@ -1004,7 +1019,10 @@ impl Deduplicater { // 2. ensures that all columns necessary to perform the sort are present // 3. ensures that all columns necessary to evaluate the delete predicates are present trace!("Build sort plan for a single chunk. Sort node won't be added if the plan is already sorted"); - let mut schema_merger = SchemaMerger::new().merge(&output_schema).unwrap(); + let mut schema_merger = SchemaMerger::new() + .with_interner(schema_interner) + .merge(&output_schema) + .unwrap(); let chunk_schema = chunk.schema(); trace!(?chunk_schema, "chunk schema"); @@ -1036,7 +1054,7 @@ impl Deduplicater { let mut input: Arc = Arc::new(IOxReadFilterNode::new( ctx, Arc::clone(&table_name), - Arc::new(input_schema), + input_schema, vec![Arc::clone(&chunk)], predicate, )); @@ -1147,6 +1165,7 @@ impl Deduplicater { chunk: Arc, // This chunk is identified having no duplicates predicate: Predicate, output_sort_key: Option<&SortKey>, + schema_interner: &mut SchemaInterner, ) -> Result> { Self::build_sort_plan_for_read_filter( ctx, @@ -1155,6 +1174,7 @@ impl Deduplicater { chunk, predicate, output_sort_key, + schema_interner, ) } @@ -1196,6 +1216,7 @@ impl Deduplicater { chunks: Vec>, // These chunks is identified having no duplicates predicate: Predicate, output_sort_key: Option<&SortKey>, + schema_interner: &mut SchemaInterner, ) -> Result>> { let mut plans: Vec> = vec![]; @@ -1226,6 +1247,7 @@ impl Deduplicater { Arc::clone(chunk), predicate.clone(), output_sort_key, + schema_interner, ) }) .collect(); @@ -1240,8 +1262,11 @@ impl Deduplicater { } /// Find the columns needed in chunks' primary keys across schemas - fn compute_pk_schema(chunks: &[Arc]) -> Arc { - let mut schema_merger = SchemaMerger::new(); + fn compute_pk_schema( + chunks: &[Arc], + schema_interner: &mut SchemaInterner, + ) -> Arc { + let mut schema_merger = SchemaMerger::new().with_interner(schema_interner); for chunk in chunks { let chunk_schema = chunk.schema(); for (column_type, field) in chunk_schema.iter() { @@ -1256,19 +1281,23 @@ impl Deduplicater { } } - Arc::new(schema_merger.build()) + schema_merger.build() } /// Find columns required to read from each scan: the output columns + the /// primary key columns - fn compute_input_schema(output_schema: &Schema, pk_schema: &Schema) -> Arc { - let input_schema = SchemaMerger::new() + fn compute_input_schema( + output_schema: &Schema, + pk_schema: &Schema, + schema_interner: &mut SchemaInterner, + ) -> Arc { + SchemaMerger::new() + .with_interner(schema_interner) .merge(output_schema) .unwrap() .merge(pk_schema) .unwrap() - .build(); - Arc::new(input_schema) + .build() } } @@ -1587,6 +1616,7 @@ mod test { Arc::clone(&chunk), Predicate::default(), Some(&sort_key.clone()), + &mut SchemaInterner::default(), ) .unwrap(); @@ -1629,6 +1659,7 @@ mod test { Arc::clone(&chunk), Predicate::default(), Some(&sort_key), + &mut SchemaInterner::default(), ) .unwrap(); @@ -1663,6 +1694,7 @@ mod test { Arc::clone(&chunk), Predicate::default(), &sort_key, + &mut SchemaInterner::default(), ) .unwrap(); @@ -1693,6 +1725,7 @@ mod test { Arc::clone(&chunk), Predicate::default(), &sort_key, + &mut SchemaInterner::default(), ) .unwrap(); @@ -1739,6 +1772,7 @@ mod test { vec![Arc::clone(&chunk1), Arc::clone(&chunk2)], Predicate::default(), None, // not ask to sort the output of the plan + &mut SchemaInterner::default(), ) .unwrap(); @@ -1759,6 +1793,7 @@ mod test { vec![Arc::clone(&chunk1), Arc::clone(&chunk2)], Predicate::default(), Some(&sort_key), // sort output on this sort_key + &mut SchemaInterner::default(), ) .unwrap(); @@ -1924,6 +1959,7 @@ mod test { chunks, Predicate::default(), &output_sort_key, + &mut SchemaInterner::default(), ) .unwrap(); @@ -1984,6 +2020,7 @@ mod test { vec![chunk1, chunk2], Predicate::default(), &output_sort_key, + &mut SchemaInterner::default(), ) .unwrap(); @@ -2059,6 +2096,7 @@ mod test { chunks, Predicate::default(), &output_sort_key, + &mut SchemaInterner::default(), ) .unwrap(); let batch = test_collect(sort_plan).await; @@ -2153,6 +2191,7 @@ mod test { chunks, Predicate::default(), &output_sort_key, + &mut SchemaInterner::default(), ) .unwrap(); let batch = test_collect(sort_plan).await; @@ -2253,10 +2292,11 @@ mod test { let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( IOxSessionContext::with_testing(), Arc::from("t"), - Arc::new(schema), + schema, chunks, Predicate::default(), &output_sort_key, + &mut SchemaInterner::default(), ) .unwrap(); let batch = test_collect(sort_plan).await; diff --git a/iox_query/src/pruning.rs b/iox_query/src/pruning.rs index a9489b5848..53ab2d91b4 100644 --- a/iox_query/src/pruning.rs +++ b/iox_query/src/pruning.rs @@ -519,7 +519,7 @@ mod test { for chunk in chunks { merger = merger.merge(chunk.schema().as_ref()).unwrap(); } - Arc::new(merger.build()) + merger.build() } #[test] diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 827a418d3e..a99bd2f19c 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -150,7 +150,7 @@ impl QueryDatabaseMeta for TestDatabase { } } - found_one.then(|| Arc::new(merger.build())) + found_one.then(|| merger.build()) } fn table_names(&self) -> Vec { @@ -519,7 +519,7 @@ impl TestChunk { merger = merger .merge(self.schema.as_ref()) .expect("merging was successful"); - self.schema = Arc::new(merger.build()); + self.schema = merger.build(); if add_column_summary { let influxdb_type = col_type.map(|t| match t { diff --git a/schema/src/interner.rs b/schema/src/interner.rs new file mode 100644 index 0000000000..c78f2604a3 --- /dev/null +++ b/schema/src/interner.rs @@ -0,0 +1,57 @@ +use std::{collections::HashSet, sync::Arc}; + +use crate::Schema; + +/// Helper that handles [Interning] for [`Schema`]s. +/// +/// Note that this is rather expensive since the interner needs to compare the entire schema, so if you find another +/// key to to store your schema (e.g. a table ID), use a `HashMap>` instead. +/// +/// [Interning]: https://en.wikipedia.org/wiki/Interning_(computer_science) +#[derive(Debug, Default)] +pub struct SchemaInterner { + schemas: HashSet>, +} + +impl SchemaInterner { + /// Create new, empty interner. + pub fn new() -> Self { + Self::default() + } + + /// Intern schema. + pub fn intern(&mut self, schema: Schema) -> Arc { + if let Some(schema) = self.schemas.get(&schema) { + Arc::clone(schema) + } else { + let schema = Arc::new(schema); + self.schemas.insert(Arc::clone(&schema)); + schema + } + } +} + +#[cfg(test)] +mod tests { + use crate::builder::SchemaBuilder; + + use super::*; + + #[test] + fn test() { + let mut interner = SchemaInterner::default(); + + let schema_1a = SchemaBuilder::new().tag("t1").tag("t2").build().unwrap(); + let schema_1b = SchemaBuilder::new().tag("t1").tag("t2").build().unwrap(); + let schema_2 = SchemaBuilder::new().tag("t1").tag("t3").build().unwrap(); + + let interned_1a = interner.intern(schema_1a.clone()); + assert_eq!(interned_1a.as_ref(), &schema_1a); + + let interned_1b = interner.intern(schema_1b); + assert!(Arc::ptr_eq(&interned_1a, &interned_1b)); + + let interned_2 = interner.intern(schema_2.clone()); + assert_eq!(interned_2.as_ref(), &schema_2); + } +} diff --git a/schema/src/lib.rs b/schema/src/lib.rs index ccba0d511c..2dc571b4be 100644 --- a/schema/src/lib.rs +++ b/schema/src/lib.rs @@ -39,6 +39,7 @@ pub fn TIME_DATA_TYPE() -> ArrowDataType { } pub mod builder; +pub mod interner; pub mod merge; pub mod selection; pub mod sort; @@ -83,7 +84,7 @@ pub type Result = std::result::Result; /// Specifically, each column in the Arrow schema has a corresponding /// InfluxDB data model type of Tag, Field or Timestamp which is stored in /// the metadata field of the ArrowSchemaRef -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Schema { /// All the actual data lives on the metadata structure in /// `ArrowSchemaRef` and this structure knows how to access that diff --git a/schema/src/merge.rs b/schema/src/merge.rs index b87446973b..a50e7290d2 100644 --- a/schema/src/merge.rs +++ b/schema/src/merge.rs @@ -8,6 +8,8 @@ use hashbrown::hash_map::RawEntryMut; use hashbrown::HashMap; use snafu::Snafu; +use crate::interner::SchemaInterner; + use super::{InfluxColumnType, Schema}; /// Database schema creation / validation errors. @@ -81,7 +83,7 @@ pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { let schema = Schema::try_from(batch.schema()).expect("Schema conversion error"); merger = merger.merge(&schema).expect("Schemas compatible"); } - Arc::new(merger.build()) + merger.build() } /// Schema Merger @@ -96,18 +98,30 @@ pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { /// /// 2. The measurement names must be consistent: one or both can be /// `None`, or they can both be `Some(name`) -#[derive(Debug, Default, Clone)] -pub struct SchemaMerger { +#[derive(Debug, Default)] +pub struct SchemaMerger<'a> { /// Maps column names to their definition fields: HashMap)>, /// The measurement name if any measurement: Option, + /// Interner, if any. + interner: Option<&'a mut SchemaInterner>, } -impl SchemaMerger { +impl SchemaMerger<'static> { pub fn new() -> Self { Self::default() } +} + +impl<'a> SchemaMerger<'a> { + pub fn with_interner(self, interner: &mut SchemaInterner) -> SchemaMerger<'_> { + SchemaMerger { + fields: self.fields, + measurement: self.measurement, + interner: Some(interner), + } + } /// Appends the schema to the merged schema being built, /// validating that no columns are added. @@ -185,13 +199,19 @@ impl SchemaMerger { } /// Returns the schema that was built, the columns are always sorted in lexicographic order - pub fn build(mut self) -> Schema { - Schema::new_from_parts( + pub fn build(mut self) -> Arc { + let schema = Schema::new_from_parts( self.measurement.take(), self.fields.drain().map(|x| x.1), true, ) - .expect("failed to build merged schema") + .expect("failed to build merged schema"); + + if let Some(interner) = self.interner.as_mut() { + interner.intern(schema) + } else { + Arc::new(schema) + } } } @@ -223,8 +243,8 @@ mod tests { .unwrap() .build(); - assert_eq!(merged_schema, schema1); - assert_eq!(merged_schema, schema2); + assert_eq!(merged_schema.as_ref(), &schema1); + assert_eq!(merged_schema.as_ref(), &schema2); } #[test] @@ -264,9 +284,11 @@ mod tests { .sort_fields_by_name(); assert_eq!( - expected_schema, merged_schema, + &expected_schema, + merged_schema.as_ref(), "\nExpected:\n{:#?}\nActual:\n{:#?}", - expected_schema, merged_schema + expected_schema, + merged_schema ); } @@ -292,9 +314,11 @@ mod tests { .unwrap(); assert_eq!( - expected_schema, merged_schema, + &expected_schema, + merged_schema.as_ref(), "\nExpected:\n{:#?}\nActual:\n{:#?}", - expected_schema, merged_schema + expected_schema, + merged_schema ); } @@ -324,9 +348,11 @@ mod tests { .unwrap(); assert_eq!( - expected_schema, merged_schema, + &expected_schema, + merged_schema.as_ref(), "\nExpected:\n{:#?}\nActual:\n{:#?}", - expected_schema, merged_schema + expected_schema, + merged_schema ); } @@ -354,9 +380,11 @@ mod tests { .unwrap(); assert_eq!( - expected_schema, merged_schema, + &expected_schema, + merged_schema.as_ref(), "\nExpected:\n{:#?}\nActual:\n{:#?}", - expected_schema, merged_schema + expected_schema, + merged_schema ); } @@ -450,4 +478,46 @@ mod tests { assert_eq!(merged_schema_error.to_string(), "Schema Merge Error: Incompatible nullability for 'int_field'. Existing field can not be null, new field can be null"); } + + #[test] + fn test_interning() { + let schema_1a = SchemaBuilder::new() + .influx_field("int_field", Integer) + .tag("the_tag") + .build() + .unwrap(); + + let schema_1b = SchemaBuilder::new() + .influx_field("int_field", Integer) + .tag("the_tag") + .build() + .unwrap(); + + let schema_2 = SchemaBuilder::new() + .influx_field("float_field", crate::InfluxFieldType::Float) + .tag("the_tag") + .build() + .unwrap(); + + let mut interner = SchemaInterner::new(); + + let merged_schema_a = SchemaMerger::new() + .with_interner(&mut interner) + .merge(&schema_1a) + .unwrap() + .merge(&schema_2) + .unwrap() + .build(); + + let merged_schema_b = SchemaMerger::new() + .with_interner(&mut interner) + .merge(&schema_1b) + .unwrap() + .merge(&schema_2) + .unwrap() + .build(); + + assert_eq!(merged_schema_a, merged_schema_b); + assert!(Arc::ptr_eq(&merged_schema_a, &merged_schema_b)); + } }