diff --git a/db/src/catalog/chunk.rs b/db/src/catalog/chunk.rs index 3a9511cd37..624d7773a0 100644 --- a/db/src/catalog/chunk.rs +++ b/db/src/catalog/chunk.rs @@ -14,7 +14,7 @@ use observability_deps::tracing::debug; use parking_lot::Mutex; use parquet_file::chunk::ParquetChunk; use read_buffer::RBChunk; -use schema::{selection::Selection, Schema, TIME_COLUMN_NAME}; +use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; use snafu::Snafu; use std::sync::Arc; use time::{Time, TimeProvider}; @@ -88,6 +88,9 @@ pub struct ChunkMetadata { /// into IOx. Note due to the compaction, etc... this may not be the chunk /// that data was originally written into pub time_of_last_write: Time, + + /// Sort key for this chunk + pub sort_key: Option, } /// Different memory representations of a frozen chunk. @@ -458,11 +461,11 @@ impl CatalogChunk { self.order } - pub fn schema(&self) -> Arc { + pub fn sort_key(&self) -> Option<&SortKey> { match &self.stage { - ChunkStage::Open { mb_chunk, .. } => Arc::new(mb_chunk.schema(Selection::All).unwrap()), + ChunkStage::Open { .. } => None, ChunkStage::Frozen { meta, .. } | ChunkStage::Persisted { meta, .. } => { - Arc::clone(&meta.schema) + meta.sort_key.as_ref() } } } @@ -532,6 +535,7 @@ impl CatalogChunk { delete_predicates: del_preds, time_of_first_write: meta.time_of_first_write, time_of_last_write: meta.time_of_last_write, + sort_key: meta.sort_key.clone(), }); } } @@ -762,6 +766,7 @@ impl CatalogChunk { delete_predicates, time_of_first_write: *time_of_first_write, time_of_last_write: *time_of_last_write, + sort_key: None, }; self.stage = ChunkStage::Frozen { @@ -1293,6 +1298,7 @@ mod tests { delete_predicates: vec![], time_of_first_write: now, time_of_last_write: now, + sort_key: metadata.sort_key, }; CatalogChunk::new_object_store_only( diff --git a/db/src/chunk.rs b/db/src/chunk.rs index 89f302f2f4..ddbdec0641 100644 --- a/db/src/chunk.rs +++ b/db/src/chunk.rs @@ -121,6 +121,7 @@ impl DbChunk { delete_predicates: vec![], // open chunk does not have delete predicate time_of_first_write: *time_of_first_write, time_of_last_write: *time_of_last_write, + sort_key: None, }; (state, Arc::new(meta)) } @@ -535,20 +536,6 @@ impl QueryChunk for DbChunk { } } - /// Returns true if the chunk is sorted on its pk - /// Since data is compacted prior being moved to RUBs, data in RUBs and OBs - /// should be sorted on their PK as the results of compacting. - /// However, since we current sorted data based on their cardinality (see compute_sort_key), - /// 2 different chunks may be sorted on different order of key columns. - fn is_sorted_on_pk(&self) -> bool { - self.schema().is_sorted_on_pk() - } - - /// Returns the sort key of the chunk if any - fn sort_key(&self) -> Option> { - self.meta.schema.sort_key() - } - fn chunk_type(&self) -> &str { match &self.state { State::MutableBuffer { .. } => "MUB", @@ -571,6 +558,10 @@ impl QueryChunkMeta for DbChunk { Arc::clone(&self.meta.schema) } + fn sort_key(&self) -> Option<&SortKey> { + self.meta.sort_key.as_ref() + } + // return a reference to delete predicates of the chunk fn delete_predicates(&self) -> &[Arc] { let pred = &self.meta.delete_predicates; diff --git a/db/src/lib.rs b/db/src/lib.rs index c756cb22c6..4d50ba9e3d 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -1697,7 +1697,7 @@ mod tests { .id(); // A chunk is now in the object store and still in read buffer - let expected_parquet_size = 1247; + let expected_parquet_size = 1257; catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size); // now also in OS catalog_chunk_size_bytes_metric_eq(registry, "object_store", expected_parquet_size); @@ -2128,7 +2128,7 @@ mod tests { // Read buffer + Parquet chunk size catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); - catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1248); + catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1259); // All the chunks should have different IDs assert_ne!(mb_chunk.id(), rb_chunk.id()); @@ -2245,7 +2245,7 @@ mod tests { let registry = test_db.metric_registry.as_ref(); // Read buffer + Parquet chunk size - let object_store_bytes = 1248; + let object_store_bytes = 1259; catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); catalog_chunk_size_bytes_metric_eq(registry, "object_store", object_store_bytes); @@ -2725,8 +2725,8 @@ mod tests { id: chunk_summaries[0].id, storage: ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, - memory_bytes: 4088, // size of RB and OS chunks - object_store_bytes: 1557, // size of parquet file + memory_bytes: 4102, // size of RB and OS chunks + object_store_bytes: 1573, // size of parquet file row_count: 2, time_of_last_access: None, time_of_first_write: Time::from_timestamp_nanos(1), @@ -2776,7 +2776,7 @@ mod tests { assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 1463); assert_eq!(db.catalog.metrics().memory().read_buffer(), 2550); - assert_eq!(db.catalog.metrics().memory().object_store(), 1538); + assert_eq!(db.catalog.metrics().memory().object_store(), 1552); } #[tokio::test] diff --git a/db/src/lifecycle/compact.rs b/db/src/lifecycle/compact.rs index d0873fe68d..ce56402474 100644 --- a/db/src/lifecycle/compact.rs +++ b/db/src/lifecycle/compact.rs @@ -91,8 +91,8 @@ pub(crate) fn compact_chunks( let summaries = query_chunks .iter() .map(|x| x.summary().expect("Chunk should have summary")); - let key = compute_sort_key(summaries); - let key_str = format!("\"{}\"", key); // for logging + + let sort_key = compute_sort_key(summaries); // build schema // @@ -102,9 +102,11 @@ pub(crate) fn compact_chunks( // partitions). let schema = merge_schemas(&query_chunks); - // Cannot move query_chunks as the sort key borrows the column names - let (schema, plan) = - ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?; + let plan = ReorgPlanner::new().compact_plan( + Arc::clone(&schema), + query_chunks, + sort_key.clone(), + )?; let physical_plan = ctx.prepare_plan(&plan).await?; let stream = ctx.execute_stream(physical_plan).await?; @@ -148,6 +150,7 @@ pub(crate) fn compact_chunks( delete_predicates, time_of_first_write, time_of_last_write, + sort_key: Some(sort_key.clone()), }; let (_, chunk) = partition.create_rub_chunk(None, max_order, metadata, rb_chunk); @@ -158,7 +161,7 @@ pub(crate) fn compact_chunks( info!(input_chunks=chunk_ids.len(), %rub_row_groups, %input_rows, %output_rows, - sort_key=%key_str, compaction_took = ?elapsed, fut_execution_duration= ?fut_now.elapsed(), + %sort_key, compaction_took = ?elapsed, fut_execution_duration= ?fut_now.elapsed(), rows_per_sec=?throughput, "chunk(s) compacted"); let snapshot = DbChunk::snapshot(&chunk.read()); diff --git a/db/src/lifecycle/compact_object_store.rs b/db/src/lifecycle/compact_object_store.rs index 62da983034..5c061cc3f5 100644 --- a/db/src/lifecycle/compact_object_store.rs +++ b/db/src/lifecycle/compact_object_store.rs @@ -33,6 +33,7 @@ use parquet_file::{ }; use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint}; use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use schema::sort::SortKey; use schema::Schema; use snafu::{OptionExt, ResultExt}; use std::{ @@ -85,6 +86,7 @@ pub(crate) fn compact_object_store_chunks( // The partition will be unlock after the chunks are marked and snaphot let compacting_os_chunks = mark_chunks_to_compact(partition, chunks, ®istration, compacted_chunk_id)?; + let delete_predicates_before = compacting_os_chunks.delete_predicates; let fut = async move { @@ -116,7 +118,9 @@ pub(crate) fn compact_object_store_chunks( time_of_first_write: compacting_os_chunks.time_of_first_write, time_of_last_write: compacting_os_chunks.time_of_last_write, chunk_order: compacting_os_chunks.max_order, + sort_key: Some(sort_key.clone()), }; + let compacted_and_persisted_chunk = persist_stream_to_chunk( &db, &partition_addr, @@ -145,6 +149,7 @@ pub(crate) fn compact_object_store_chunks( compacted_and_persisted_chunk.clone(), compacting_os_chunks.partition, delete_predicates_before, + sort_key.clone(), ) .await; @@ -339,16 +344,15 @@ async fn compact_chunks(db: &Db, query_chunks: &[Arc]) -> Result]) -> Result]) -> Result, - sort_key: String, + sort_key: SortKey, } /// Persist a provided stream to a new OS chunk @@ -443,6 +447,7 @@ async fn update_in_memory_catalog( parquet_chunk: Option>, partition: Arc>, delete_predicates_before: HashSet>, + sort_key: SortKey, ) -> Option> { // Acquire write lock to drop the old chunks while also getting delete predicates added during compaction let mut partition = partition.write(); @@ -474,6 +479,7 @@ async fn update_in_memory_catalog( table_summary: Arc::clone(parquet_chunk.table_summary()), schema: parquet_chunk.schema(), delete_predicates, + sort_key: Some(sort_key), time_of_first_write: iox_metadata.time_of_first_write, time_of_last_write: iox_metadata.time_of_last_write, }; diff --git a/db/src/lifecycle/persist.rs b/db/src/lifecycle/persist.rs index 8e49d7a8f1..9db03c31c4 100644 --- a/db/src/lifecycle/persist.rs +++ b/db/src/lifecycle/persist.rs @@ -95,6 +95,8 @@ pub fn persist_chunks( return Ok(None); } + let query_chunk_len = query_chunks.len(); + let time_of_first_write = time_of_first_write.expect("Should have had a first write somewhere"); @@ -104,17 +106,16 @@ pub fn persist_chunks( let summaries = query_chunks .iter() .map(|x| x.summary().expect("Chunk should have summary")); - let key = compute_sort_key(summaries); - let key_str = format!("\"{}\"", key); // for logging + + let sort_key = compute_sort_key(summaries); // build schema let schema = merge_schemas(&query_chunks); - // Cannot move query_chunks as the sort key borrows the column names - let (schema, plan) = ReorgPlanner::new().split_plan( - schema, - query_chunks.iter().map(Arc::clone), - key, + let plan = ReorgPlanner::new().split_plan( + Arc::clone(&schema), + query_chunks, + sort_key.clone(), flush_timestamp, )?; @@ -170,6 +171,7 @@ pub fn persist_chunks( delete_predicates: delete_predicates.clone(), time_of_first_write, time_of_last_write, + sort_key: Some(sort_key.clone()), }; partition_write.create_rub_chunk(None, max_order, metadata, remainder); @@ -193,6 +195,7 @@ pub fn persist_chunks( delete_predicates, time_of_first_write, time_of_last_write, + sort_key: Some(sort_key.clone()), }; let (new_chunk_id, new_chunk) = partition_write.create_rub_chunk( @@ -220,9 +223,9 @@ pub fn persist_chunks( // input rows per second let throughput = (input_rows as u128 * 1_000_000_000) / elapsed.as_nanos(); - info!(input_chunks=query_chunks.len(), + info!(input_chunks=query_chunk_len, input_rows, persisted_rows, remainder_rows, - sort_key=%key_str, compaction_took = ?elapsed, + sort_key=%sort_key, compaction_took = ?elapsed, ?max_persistable_timestamp, rows_per_sec=?throughput, "chunk(s) persisted"); diff --git a/db/src/lifecycle/write.rs b/db/src/lifecycle/write.rs index b36da80005..a62b637c91 100644 --- a/db/src/lifecycle/write.rs +++ b/db/src/lifecycle/write.rs @@ -60,6 +60,7 @@ pub(super) fn write_chunk_to_object_store( let table_name = Arc::clone(&addr.table_name); let partition_key = Arc::clone(&addr.partition_key); let chunk_order = chunk.order(); + let sort_key = chunk.sort_key().cloned(); let delete_predicates = chunk.delete_predicates().to_vec(); let (tracker, registration) = db.jobs.register(Job::WriteChunk { @@ -134,6 +135,7 @@ pub(super) fn write_chunk_to_object_store( time_of_first_write, time_of_last_write, chunk_order, + sort_key, }; let written_result = timeout( diff --git a/db/src/load.rs b/db/src/load.rs index 4cf3b722a5..4bcf095d13 100644 --- a/db/src/load.rs +++ b/db/src/load.rs @@ -255,6 +255,7 @@ impl CatalogState for Loader { delete_predicates, time_of_first_write: iox_md.time_of_first_write, time_of_last_write: iox_md.time_of_last_write, + sort_key: iox_md.sort_key, }; partition.insert_object_store_only_chunk( diff --git a/db/src/system_tables/columns.rs b/db/src/system_tables/columns.rs index 8af8dea2b0..d9aea60881 100644 --- a/db/src/system_tables/columns.rs +++ b/db/src/system_tables/columns.rs @@ -12,6 +12,7 @@ use data_types::{ error::ErrorLogger, partition_metadata::{ColumnSummary, PartitionSummary, TableSummary}, }; +use schema::sort::SortKey; use crate::{ catalog::Catalog, @@ -130,7 +131,7 @@ impl IoxSystemTable for ChunkColumnsTable { ( chunk.table_summary(), chunk.detailed_summary(), - chunk.schema(), + chunk.sort_key().cloned(), ) }); @@ -159,7 +160,7 @@ fn chunk_columns_schema() -> SchemaRef { fn assemble_chunk_columns( schema: SchemaRef, - chunk_summaries: Vec<(Arc, DetailedChunkSummary, Arc)>, + chunk_summaries: Vec<(Arc, DetailedChunkSummary, Option)>, ) -> Result { // Create an iterator over each column in each table in each chunk // so we can build `chunk_columns` column by column @@ -171,16 +172,14 @@ fn assemble_chunk_columns( let rows = chunk_summaries .iter() - .map(|(table_summary, chunk_summary, schema)| { - let sort_key = schema.sort_key().unwrap_or_default(); - + .map(|(table_summary, chunk_summary, sort_key)| { table_summary .columns .iter() .map(move |column_summary| EachColumn { chunk_summary, column_summary, - column_sort: sort_key.get(&column_summary.name), + column_sort: sort_key.as_ref().and_then(|x| x.get(&column_summary.name)), }) }) .flatten() @@ -300,7 +299,6 @@ mod tests { chunk_metadata::{ChunkColumnSummary, ChunkId, ChunkOrder, ChunkStorage, ChunkSummary}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, }; - use schema::builder::SchemaBuilder; use schema::sort::SortKey; use time::Time; @@ -367,25 +365,7 @@ mod tests { fn test_assemble_chunk_columns() { let lifecycle_action = None; - let mut sort1 = SortKey::default(); - sort1.push("c2", Default::default()); - sort1.push("c1", Default::default()); - - let schema1 = SchemaBuilder::new() - .field("c1", DataType::Utf8) - .field("c2", DataType::Float64) - .build_with_sort_key(&sort1) - .unwrap(); - - let schema2 = SchemaBuilder::new() - .field("c1", DataType::Float64) - .build() - .unwrap(); - - let schema3 = SchemaBuilder::new() - .field("c3", DataType::Float64) - .build() - .unwrap(); + let sort = SortKey::from_columns(vec!["c2", "c1"]); let summaries = vec![ ( @@ -435,7 +415,7 @@ mod tests { }, ], }, - Arc::new(schema1), + Some(sort), ), ( Arc::new(TableSummary { @@ -466,7 +446,7 @@ mod tests { memory_bytes: 100, }], }, - Arc::new(schema2), + None, ), ( Arc::new(TableSummary { @@ -497,7 +477,7 @@ mod tests { memory_bytes: 200, }], }, - Arc::new(schema3), + None, ), ]; diff --git a/generated_types/protos/influxdata/iox/preserved_catalog/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/preserved_catalog/v1/parquet_metadata.proto index 55a06b6118..8ca078c809 100644 --- a/generated_types/protos/influxdata/iox/preserved_catalog/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/preserved_catalog/v1/parquet_metadata.proto @@ -39,6 +39,9 @@ message IoxMetadata { // Order of this chunk relative to other overlapping chunks. uint32 chunk_order = 10; + + // The sort key of this chunk + SortKey sort_key = 12; } // Partition checkpoint. @@ -81,3 +84,21 @@ message OptionalMinMaxSequence { OptionalUint64 min = 1; uint64 max = 2; } + +message SortKey { + // A sort expression + message Expr { + // The name of the column + string column = 1; + + /// Whether the data is sorted in descending order + bool descending = 2; + + /// Whether the data is sorted with nulls first + bool nulls_first = 3; + } + + repeated Expr expressions = 1; +} + + diff --git a/influxdb_iox/tests/end_to_end_cases/persistence.rs b/influxdb_iox/tests/end_to_end_cases/persistence.rs index f74f7ed3dc..0d36cc92d1 100644 --- a/influxdb_iox/tests/end_to_end_cases/persistence.rs +++ b/influxdb_iox/tests/end_to_end_cases/persistence.rs @@ -259,7 +259,11 @@ async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> Chun let partition_key = "cpu"; let table_name = "cpu"; - let lp_lines = vec!["cpu,region=west user=23.2 100"]; + let lp_lines = vec![ + "cpu,region=west,host=a user=23.2 100", + "cpu,region=west,host=b user=34.2 100", + "cpu,region=east,host=c user=54.2 100", + ]; write_client .write_lp(db_name, lp_lines.join("\n"), 0) @@ -310,7 +314,7 @@ async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> Chun async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) { let mut client = fixture.flight_client(); - let sql_query = "select region, user, time from cpu"; + let sql_query = "select region, host, user, time from cpu"; let batches = client .perform_query(db_name, sql_query) @@ -321,12 +325,35 @@ async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) { .unwrap(); let expected_read_data = vec![ - "+--------+------+--------------------------------+", - "| region | user | time |", - "+--------+------+--------------------------------+", - "| west | 23.2 | 1970-01-01T00:00:00.000000100Z |", - "+--------+------+--------------------------------+", + "+--------+------+------+--------------------------------+", + "| region | host | user | time |", + "+--------+------+------+--------------------------------+", + "| east | c | 54.2 | 1970-01-01T00:00:00.000000100Z |", + "| west | a | 23.2 | 1970-01-01T00:00:00.000000100Z |", + "| west | b | 34.2 | 1970-01-01T00:00:00.000000100Z |", + "+--------+------+------+--------------------------------+", ]; assert_batches_eq!(expected_read_data, &batches); + + let batches = client + .perform_query(db_name, "select column_name, row_count, null_count, min_value, max_value, sort_ordinal from system.chunk_columns") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let expected_columns = vec![ + "+-------------+-----------+------------+-----------+-----------+--------------+", + "| column_name | row_count | null_count | min_value | max_value | sort_ordinal |", + "+-------------+-----------+------------+-----------+-----------+--------------+", + "| host | 3 | 0 | a | c | 1 |", + "| region | 3 | 0 | east | west | 0 |", + "| time | 3 | 0 | 100 | 100 | 2 |", + "| user | 3 | 0 | 23.2 | 54.2 | |", + "+-------------+-----------+------------+-----------+-----------+--------------+", + ]; + + assert_batches_eq!(expected_columns, &batches); } diff --git a/influxdb_iox/tests/end_to_end_cases/system_tables.rs b/influxdb_iox/tests/end_to_end_cases/system_tables.rs index 1a305d0749..b7d9585cc8 100644 --- a/influxdb_iox/tests/end_to_end_cases/system_tables.rs +++ b/influxdb_iox/tests/end_to_end_cases/system_tables.rs @@ -27,7 +27,7 @@ async fn test_operations() { write_client .write_lp(&db_name1, lp_lines.join("\n"), 0) .await - .expect("write succeded"); + .expect("write succeeded"); let chunks = list_chunks(&fixture, &db_name1).await; let chunk_id = chunks[0].id; diff --git a/ingester/src/query.rs b/ingester/src/query.rs index b51d469ac6..e5310f0e38 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -129,6 +129,10 @@ impl QueryChunkMeta for QueryableBatch { merge_record_batch_schemas(&batches) } + fn sort_key(&self) -> Option<&SortKey> { + None + } + fn delete_predicates(&self) -> &[Arc] { self.delete_predicates.as_ref() } @@ -270,16 +274,6 @@ impl QueryChunk for QueryableBatch { Ok(Box::pin(stream)) } - /// Returns true if data of this chunk is sorted - fn is_sorted_on_pk(&self) -> bool { - false - } - - /// Returns the sort key of the chunk if any - fn sort_key(&self) -> Option> { - None - } - /// Returns chunk type fn chunk_type(&self) -> &str { "PersistingBatch" diff --git a/parquet_catalog/src/dump.rs b/parquet_catalog/src/dump.rs index 4a70033c3b..fd2512eb54 100644 --- a/parquet_catalog/src/dump.rs +++ b/parquet_catalog/src/dump.rs @@ -490,6 +490,7 @@ File { chunk_order: ChunkOrder( 1, ), + sort_key: None, }, ), schema: Ok( diff --git a/parquet_catalog/src/rebuild.rs b/parquet_catalog/src/rebuild.rs index 6b2b18f145..fb42ca1dad 100644 --- a/parquet_catalog/src/rebuild.rs +++ b/parquet_catalog/src/rebuild.rs @@ -380,6 +380,7 @@ mod tests { time_of_first_write: Time::from_timestamp_nanos(0), time_of_last_write: Time::from_timestamp_nanos(0), chunk_order: ChunkOrder::new(5).unwrap(), + sort_key: None, }; let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches)); let (path, file_size_bytes, metadata) = storage diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 8346d3a286..249f03d7e1 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -113,6 +113,7 @@ use persistence_windows::{ min_max_sequence::OptionalMinMaxSequence, }; use prost::Message; +use schema::sort::{SortKey, SortKeyBuilder}; use schema::{InfluxColumnType, InfluxFieldType, Schema}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{collections::BTreeMap, convert::TryInto, sync::Arc}; @@ -293,6 +294,9 @@ pub struct IoxMetadataOld { /// Order of this chunk relative to other overlapping chunks. pub chunk_order: ChunkOrder, + + /// Sort key of this chunk + pub sort_key: Option, } impl IoxMetadataOld { @@ -332,6 +336,7 @@ impl IoxMetadataOld { .context(IoxMetadataFieldMissingSnafu { field: "partition_checkpoint", })?; + let sequencer_numbers = proto_partition_checkpoint .sequencer_numbers .into_iter() @@ -345,6 +350,7 @@ impl IoxMetadataOld { } }) .collect::>>()?; + let flush_timestamp = decode_timestamp_from_field( proto_partition_checkpoint.flush_timestamp, "partition_checkpoint.flush_timestamp", @@ -363,6 +369,7 @@ impl IoxMetadataOld { .context(IoxMetadataFieldMissingSnafu { field: "database_checkpoint", })?; + let sequencer_numbers = proto_database_checkpoint .sequencer_numbers .into_iter() @@ -376,8 +383,17 @@ impl IoxMetadataOld { } }) .collect::>>()?; + let database_checkpoint = DatabaseCheckpoint::new(sequencer_numbers); + let sort_key = proto_msg.sort_key.map(|proto_key| { + let mut builder = SortKeyBuilder::with_capacity(proto_key.expressions.len()); + for expr in proto_key.expressions { + builder = builder.with_col_opts(expr.column, expr.descending, expr.nulls_first) + } + builder.build() + }); + Ok(Self { creation_timestamp, time_of_first_write, @@ -395,6 +411,7 @@ impl IoxMetadataOld { field: "chunk_order".to_string(), } })?, + sort_key, }) } @@ -442,6 +459,20 @@ impl IoxMetadataOld { .collect(), }; + let sort_key = self + .sort_key + .as_ref() + .map(|key| preserved_catalog::SortKey { + expressions: key + .iter() + .map(|(name, options)| preserved_catalog::sort_key::Expr { + column: name.to_string(), + descending: options.descending, + nulls_first: options.nulls_first, + }) + .collect(), + }); + let proto_msg = preserved_catalog::IoxMetadata { version: METADATA_VERSION, creation_timestamp: Some(self.creation_timestamp.date_time().into()), @@ -453,6 +484,7 @@ impl IoxMetadataOld { partition_checkpoint: Some(proto_partition_checkpoint), database_checkpoint: Some(proto_database_checkpoint), chunk_order: self.chunk_order.get(), + sort_key, }; let mut buf = Vec::new(); @@ -1203,6 +1235,7 @@ mod tests { time_of_first_write: Time::from_timestamp(3234, 0), time_of_last_write: Time::from_timestamp(3234, 3456), chunk_order: ChunkOrder::new(5).unwrap(), + sort_key: None, }; let proto_bytes = metadata.to_protobuf().unwrap(); diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index bfa0566da1..92be79aaf6 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -420,6 +420,7 @@ mod tests { time_of_first_write: Time::from_timestamp_nanos(456), time_of_last_write: Time::from_timestamp_nanos(43069346), chunk_order: ChunkOrder::new(5).unwrap(), + sort_key: None, }; // create parquet file @@ -495,6 +496,7 @@ mod tests { time_of_first_write: Time::from_timestamp_nanos(234), time_of_last_write: Time::from_timestamp_nanos(4784), chunk_order: ChunkOrder::new(5).unwrap(), + sort_key: None, }; let (path, _file_size_bytes, _metadata) = storage diff --git a/parquet_file/src/test_utils/generator.rs b/parquet_file/src/test_utils/generator.rs index 9038b5906d..13f50d73cb 100644 --- a/parquet_file/src/test_utils/generator.rs +++ b/parquet_file/src/test_utils/generator.rs @@ -92,6 +92,7 @@ impl ChunkGenerator { database_checkpoint, time_of_first_write: Time::from_timestamp(30, 40), time_of_last_write: Time::from_timestamp(50, 60), + sort_key: None, }; let (record_batches, schema, column_summaries, rows) = match self.config { diff --git a/query/src/frontend.rs b/query/src/frontend.rs index 3c6870b5f1..6976a21baa 100644 --- a/query/src/frontend.rs +++ b/query/src/frontend.rs @@ -6,7 +6,6 @@ pub mod sql; mod test { use std::sync::Arc; - use arrow::compute::SortOptions; use datafusion::physical_plan::{ metrics::{self, MetricValue}, ExecutionPlan, ExecutionPlanVisitor, @@ -60,18 +59,10 @@ mod test { #[tokio::test] async fn test_metrics() { let (schema, chunks) = get_test_chunks(); - - let mut sort_key = SortKey::with_capacity(1); - sort_key.push( - "time", - SortOptions { - descending: false, - nulls_first: false, - }, - ); + let sort_key = SortKey::from_columns(vec!["time", "tag1"]); // Use a split plan as it has StreamSplitExec, DeduplicateExec and IOxReadFilternode - let (_, split_plan) = ReorgPlanner::new() + let split_plan = ReorgPlanner::new() .split_plan(schema, chunks, sort_key, 1000) .expect("created compact plan"); diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index 3915d3764e..2f4a21fd0e 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use datafusion::logical_plan::{col, lit_timestamp_nano, Expr, LogicalPlan, LogicalPlanBuilder}; -use observability_deps::tracing::{debug, trace}; +use observability_deps::tracing::debug; use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; use crate::{ @@ -112,8 +112,8 @@ impl ReorgPlanner { &self, schema: Arc, chunks: I, - output_sort: SortKey<'_>, - ) -> Result<(Arc, LogicalPlan)> + sort_key: SortKey, + ) -> Result where C: QueryChunk + 'static, I: IntoIterator>, @@ -121,29 +121,13 @@ impl ReorgPlanner { let ScanPlan { plan_builder, provider, - } = self.sorted_scan_plan(schema, chunks)?; - - let mut schema = provider.iox_schema(); - - // Set the sort_key of the schema to the compacted chunk's sort key - // Try to do this only if the sort key changes so we avoid unnecessary schema copies. - trace!(input_schema=?schema, "Setting sort key on schema for compact plan"); - if schema - .sort_key() - .map_or(true, |existing_key| existing_key != output_sort) - { - let mut schema_cloned = schema.as_ref().clone(); - schema_cloned.set_sort_key(&output_sort); - schema = Arc::new(schema_cloned); - } - trace!(output_schema=?schema, "Setting sort key on schema for compact plan"); - + } = self.sorted_scan_plan(schema, chunks, sort_key)?; let plan = plan_builder.build().context(BuildingPlanSnafu)?; debug!(table_name=provider.table_name(), plan=%plan.display_indent_schema(), "created compact plan for table"); - Ok((schema, plan)) + Ok(plan) } /// Creates an execution plan for the SPLIT operations which does the following: @@ -194,9 +178,9 @@ impl ReorgPlanner { &self, schema: Arc, chunks: I, - output_sort: SortKey<'_>, + sort_key: SortKey, split_time: i64, - ) -> Result<(Arc, LogicalPlan)> + ) -> Result where C: QueryChunk + 'static, I: IntoIterator>, @@ -204,46 +188,33 @@ impl ReorgPlanner { let ScanPlan { plan_builder, provider, - } = self.sorted_scan_plan(schema, chunks)?; - - let mut schema = provider.iox_schema(); - - // Set output_sort as the sort_key of the schema - // Try to do this only if the sort key changes so we avoid unnecessary schema copies. - trace!(input_schema=?schema, "Setting sort key on schema for split plan"); - if schema - .sort_key() - .map_or(true, |existing_key| existing_key != output_sort) - { - let mut schema_cloned = schema.as_ref().clone(); - schema_cloned.set_sort_key(&output_sort); - schema = Arc::new(schema_cloned); - } - trace!(output_schema=?schema, "Setting sort key on schema for split plan"); + } = self.sorted_scan_plan(schema, chunks, sort_key)?; // time <= split_time let split_expr = col(TIME_COLUMN_NAME).lt_eq(lit_timestamp_nano(split_time)); let plan = plan_builder.build().context(BuildingPlanSnafu)?; - let plan = make_stream_split(plan, split_expr); debug!(table_name=provider.table_name(), plan=%plan.display_indent_schema(), "created split plan for table"); - Ok((schema, plan)) + Ok(plan) } /// Creates a scan plan for the given set of chunks. + /// /// Output data of the scan will be deduplicated sorted if `sort=true` on /// the optimal sort order of the chunks' PK columns (tags and time). /// - /// The optimal sort order is computed based on the PK columns cardinality - /// that will be best for RLE encoding. - /// /// Refer to query::provider::build_scan_plan for the detail of the plan /// - fn sorted_scan_plan(&self, schema: Arc, chunks: I) -> Result> + fn sorted_scan_plan( + &self, + schema: Arc, + chunks: I, + sort_key: SortKey, + ) -> Result> where C: QueryChunk + 'static, I: IntoIterator>, @@ -256,12 +227,11 @@ impl ReorgPlanner { let table_name = &table_name; // Prepare the plan for the table - let mut builder = ProviderBuilder::new(table_name, schema); - // Tell the scan of this provider to sort its output on the chunks' PK - builder.ensure_pk_sort(); - - // There are no predicates in these plans, so no need to prune them - builder = builder.add_no_op_pruner(); + let mut builder = ProviderBuilder::new(table_name, schema) + // There are no predicates in these plans, so no need to prune them + .add_no_op_pruner() + // Tell the scan of this provider to sort its output on the chunks' PK + .with_sort_key(sort_key); for chunk in chunks { // check that it is consistent with this table_name @@ -278,6 +248,7 @@ impl ReorgPlanner { let provider = builder .build() .context(CreatingProviderSnafu { table_name })?; + let provider = Arc::new(provider); // Scan all columns @@ -301,10 +272,10 @@ struct ScanPlan { #[cfg(test)] mod test { - use arrow::compute::SortOptions; use arrow_util::assert_batches_eq; use datafusion_util::{test_collect, test_collect_partition}; use schema::merge::SchemaMerger; + use schema::sort::SortKeyBuilder; use crate::{ exec::{Executor, ExecutorType}, @@ -413,23 +384,12 @@ mod test { let (schema, chunks) = get_test_chunks().await; - let mut sort_key = SortKey::with_capacity(2); - sort_key.push( - "tag1", - SortOptions { - descending: true, - nulls_first: true, - }, - ); - sort_key.push( - "time", - SortOptions { - descending: false, - nulls_first: false, - }, - ); + let sort_key = SortKeyBuilder::with_capacity(2) + .with_col_opts("tag1", true, true) + .with_col_opts(TIME_COLUMN_NAME, false, false) + .build(); - let (_, compact_plan) = ReorgPlanner::new() + let compact_plan = ReorgPlanner::new() .compact_plan(schema, chunks, sort_key) .expect("created compact plan"); @@ -453,14 +413,14 @@ mod test { "+-----------+------------+------+--------------------------------+", "| field_int | field_int2 | tag1 | time |", "+-----------+------------+------+--------------------------------+", - "| 100 | | AL | 1970-01-01T00:00:00.000000050Z |", - "| 70 | | CT | 1970-01-01T00:00:00.000000100Z |", + "| 1000 | 1000 | WA | 1970-01-01T00:00:00.000028Z |", + "| 50 | 50 | VT | 1970-01-01T00:00:00.000210Z |", + "| 70 | 70 | UT | 1970-01-01T00:00:00.000220Z |", "| 1000 | | MT | 1970-01-01T00:00:00.000001Z |", "| 5 | | MT | 1970-01-01T00:00:00.000005Z |", "| 10 | | MT | 1970-01-01T00:00:00.000007Z |", - "| 70 | 70 | UT | 1970-01-01T00:00:00.000220Z |", - "| 50 | 50 | VT | 1970-01-01T00:00:00.000210Z |", - "| 1000 | 1000 | WA | 1970-01-01T00:00:00.000028Z |", + "| 70 | | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | | AL | 1970-01-01T00:00:00.000000050Z |", "+-----------+------------+------+--------------------------------+", ]; @@ -474,17 +434,13 @@ mod test { // the operator is tested in its own module. let (schema, chunks) = get_test_chunks().await; - let mut sort_key = SortKey::with_capacity(1); - sort_key.push( - "time", - SortOptions { - descending: false, - nulls_first: false, - }, - ); + let sort_key = SortKeyBuilder::with_capacity(2) + .with_col_opts("time", false, false) + .with_col_opts("tag1", false, true) + .build(); // split on 1000 should have timestamps 1000, 5000, and 7000 - let (_, split_plan) = ReorgPlanner::new() + let split_plan = ReorgPlanner::new() .split_plan(schema, chunks, sort_key, 1000) .expect("created compact plan"); @@ -519,16 +475,16 @@ mod test { let batches1 = test_collect_partition(physical_plan, 1).await; - // Sorted on state (tag1) ASC and time + // Sorted on time let expected = vec![ "+-----------+------------+------+-----------------------------+", "| field_int | field_int2 | tag1 | time |", "+-----------+------------+------+-----------------------------+", "| 5 | | MT | 1970-01-01T00:00:00.000005Z |", "| 10 | | MT | 1970-01-01T00:00:00.000007Z |", - "| 70 | 70 | UT | 1970-01-01T00:00:00.000220Z |", - "| 50 | 50 | VT | 1970-01-01T00:00:00.000210Z |", "| 1000 | 1000 | WA | 1970-01-01T00:00:00.000028Z |", + "| 50 | 50 | VT | 1970-01-01T00:00:00.000210Z |", + "| 70 | 70 | UT | 1970-01-01T00:00:00.000220Z |", "+-----------+------------+------+-----------------------------+", ]; diff --git a/query/src/lib.rs b/query/src/lib.rs index 1192efb4f2..b23f236c55 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -34,6 +34,7 @@ pub mod statistics; pub mod util; pub use exec::context::{DEFAULT_CATALOG, DEFAULT_SCHEMA}; +use schema::sort::SortKeyBuilder; /// Trait for an object (designed to be a Chunk) which can provide /// metadata @@ -44,6 +45,9 @@ pub trait QueryChunkMeta: Sized { /// return a reference to the summary of the data held in this chunk fn schema(&self) -> Arc; + /// return a reference to the sort key if any + fn sort_key(&self) -> Option<&SortKey>; + /// return a reference to delete predicates of the chunk fn delete_predicates(&self) -> &[Arc]; @@ -207,12 +211,6 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync { selection: Selection<'_>, ) -> Result; - /// Returns true if data of this chunk is sorted - fn is_sorted_on_pk(&self) -> bool; - - /// Returns the sort key of the chunk if any - fn sort_key(&self) -> Option>; - /// Returns chunk type which is either MUB, RUB, OS fn chunk_type(&self) -> &str; @@ -233,6 +231,10 @@ where self.as_ref().schema() } + fn sort_key(&self) -> Option<&SortKey> { + self.as_ref().sort_key() + } + fn delete_predicates(&self) -> &[Arc] { let pred = self.as_ref().delete_predicates(); debug!(?pred, "Delete predicate in QueryChunkMeta"); @@ -251,19 +253,14 @@ where chunks.iter().all(|c| c.summary().is_some()) } -pub fn compute_sort_key_for_chunks<'a, C>(schema: &'a Schema, chunks: &'a [C]) -> SortKey<'a> +pub fn compute_sort_key_for_chunks(schema: &Schema, chunks: &[C]) -> SortKey where C: QueryChunkMeta, { if !chunks_have_stats(chunks) { // chunks have not enough stats, return its pk that is // sorted lexicographically but time column always last - let pk = schema.primary_key(); - let mut sort_key = SortKey::with_capacity(pk.len()); - for col in pk { - sort_key.push(col, Default::default()) - } - sort_key + SortKey::from_columns(schema.primary_key()) } else { let summaries = chunks .iter() @@ -276,7 +273,7 @@ where /// /// In the absence of more precise information, this should yield a /// good ordering for RLE compression -pub fn compute_sort_key<'a>(summaries: impl Iterator) -> SortKey<'a> { +pub fn compute_sort_key<'a>(summaries: impl Iterator) -> SortKey { let mut cardinalities: HashMap<&str, u64> = Default::default(); for summary in summaries { for column in &summary.columns { @@ -298,11 +295,13 @@ pub fn compute_sort_key<'a>(summaries: impl Iterator) - // Sort by (cardinality, column_name) to have deterministic order if same cardinality cardinalities.sort_by_key(|x| (x.1, x.0)); - let mut key = SortKey::with_capacity(cardinalities.len() + 1); + let mut builder = SortKeyBuilder::with_capacity(cardinalities.len() + 1); for (col, _) in cardinalities { - key.push(col, Default::default()) + builder = builder.with_col(col) } - key.push(TIME_COLUMN_NAME, Default::default()); + builder = builder.with_col(TIME_COLUMN_NAME); + + let key = builder.build(); trace!(computed_sort_key=?key, "Value of sort key from compute_sort_key"); diff --git a/query/src/provider.rs b/query/src/provider.rs index a89ed5045f..8ff4a0923c 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -19,12 +19,12 @@ use datafusion::{ }; use observability_deps::tracing::{debug, trace}; use predicate::{Predicate, PredicateBuilder}; -use schema::{merge::SchemaMerger, sort::SortKey, Schema}; +use schema::{merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema}; use crate::{ chunks_have_stats, compute_sort_key_for_chunks, util::{arrow_sort_key_exprs, df_physical_expr}, - QueryChunk, + QueryChunk, QueryChunkMeta, }; use snafu::{ResultExt, Snafu}; @@ -84,11 +84,6 @@ impl From for DataFusionError { } } -enum ColumnType { - PrimaryKey, - DeletePredicate, -} - /// Something that can prune chunks based on their metadata pub trait ChunkPruner: Sync + Send + std::fmt::Debug { /// prune `chunks`, if possible, based on predicate. @@ -110,8 +105,7 @@ pub struct ProviderBuilder { schema: Arc, chunk_pruner: Option>>, chunks: Vec>, - /// ensure the output is sorted on the pk columns (in an optimal order computed based on their cardinality) - ensure_pk_sort: bool, + sort_key: Option, } impl ProviderBuilder { @@ -121,13 +115,16 @@ impl ProviderBuilder { schema, chunk_pruner: None, chunks: Vec::new(), - ensure_pk_sort: false, // never sort the output unless explicitly specified + sort_key: None, } } - /// Requests the output of the scan sorted - pub fn ensure_pk_sort(&mut self) { - self.ensure_pk_sort = true; + /// Produce sorted output + pub fn with_sort_key(self, sort_key: SortKey) -> Self { + Self { + sort_key: Some(sort_key), + ..self + } } /// Add a new chunk to this provider @@ -175,7 +172,7 @@ impl ProviderBuilder { chunk_pruner, table_name: self.table_name, chunks: self.chunks, - ensure_pk_sort: self.ensure_pk_sort, + sort_key: self.sort_key, }) } } @@ -191,10 +188,10 @@ pub struct ChunkTableProvider { iox_schema: Arc, /// Something that can prune chunks chunk_pruner: Arc>, - // The chunks + /// The chunks chunks: Vec>, - /// ensure the output is sorted on the pk columns (in an optimal order computed based on their cardinality) - ensure_pk_sort: bool, + /// The sort key if any + sort_key: Option, } impl ChunkTableProvider { @@ -212,11 +209,6 @@ impl ChunkTableProvider { pub fn table_name(&self) -> &str { self.table_name.as_ref() } - - /// Requests the output of the scan sorted - pub fn ensure_pk_sort(&mut self) { - self.ensure_pk_sort = true; - } } #[async_trait] @@ -259,7 +251,7 @@ impl TableProvider for ChunkTableProvider { // Figure out the schema of the requested output let scan_schema = match projection { - Some(indicies) => Arc::new(self.iox_schema.select_by_indices(indicies)), + Some(indices) => Arc::new(self.iox_schema.select_by_indices(indices)), None => Arc::clone(&self.iox_schema), }; @@ -278,13 +270,13 @@ impl TableProvider for ChunkTableProvider { scan_schema, chunks, predicate, - self.ensure_pk_sort, + self.sort_key.clone(), )?; Ok(plan) } - /// Filter pushdown specificiation + /// Filter pushdown specification fn supports_filter_pushdown( &self, _filter: &Expr, @@ -296,13 +288,13 @@ impl TableProvider for ChunkTableProvider { #[derive(Clone, Debug, Default)] /// A deduplicater that deduplicate the duplicated data during scan execution pub(crate) struct Deduplicater { - // a vector of a vector of overlapped chunks + /// a vector of a vector of overlapped chunks pub overlapped_chunks_set: Vec>>, - // a vector of non-overlapped chunks each have duplicates in itself + /// a vector of non-overlapped chunks each have duplicates in itself pub in_chunk_duplicates_chunks: Vec>, - // a vector of non-overlapped and non-duplicates chunks + /// a vector of non-overlapped and non-duplicates chunks pub no_duplicates_chunks: Vec>, } @@ -354,7 +346,7 @@ impl Deduplicater { /// │ │ │ /// │ │ │ /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ - /// │ DeduplicateExec │ │ DeduplicateExec │ │ SortExec │ <-- This is added if sort_output = true + /// │ DeduplicateExec │ │ DeduplicateExec │ │ SortExec │ <-- This is added if output_sort_key.is_some() /// └─────────────────┘ └─────────────────┘ │ (Optional) │ /// ▲ ▲ └─────────────────┘ /// │ │ ▲ @@ -381,6 +373,11 @@ impl Deduplicater { /// │IOxReadFilterNode│ │IOxReadFilterNode│ │IOxReadFilterNode│ /// │ (Chunk 1) │ │ (Chunk 2) │ │ (Chunk 3) │ /// └─────────────────┘ └─────────────────┘ └─────────────────┘ + /// + /// # Panic + /// + /// Panics if output_sort_key is `Some` and doesn't contain all primary key columns + /// ///``` pub(crate) fn build_scan_plan( &mut self, @@ -388,15 +385,8 @@ impl Deduplicater { output_schema: Arc, chunks: Vec>, predicate: Predicate, - sort_output: bool, + output_sort_key: Option, ) -> Result> { - // Initialize an empty sort key - let mut output_sort_key = SortKey::with_capacity(0); - if sort_output { - // Compute the output sort key which is the super key of chunks' keys base on their data cardinality - output_sort_key = compute_sort_key_for_chunks(&output_schema, chunks.as_ref()); - } - // find overlapped chunks and put them into the right group self.split_overlapped_chunks(chunks.to_vec())?; @@ -407,9 +397,9 @@ impl Deduplicater { let mut non_duplicate_plans = Self::build_plans_for_non_duplicates_chunks( Arc::clone(&table_name), Arc::clone(&output_schema), - chunks.to_owned(), + chunks, predicate, - &output_sort_key, + output_sort_key.as_ref(), )?; plans.append(&mut non_duplicate_plans); } else { @@ -418,14 +408,33 @@ impl Deduplicater { no_duplicates_chunks=?self.no_duplicates_chunks.len(), "Chunks after classifying: "); + let pk_schema = Self::compute_pk_schema(&chunks); + let dedup_sort_key = match &output_sort_key { + Some(sort_key) => { + // Technically we only require that the sort order is prefixed by + // the primary key, in order for deduplication to work correctly + assert_eq!( + pk_schema.len(), + sort_key.len(), + "output_sort_key must be same length as primary key" + ); + assert!( + pk_schema.is_sorted_on_pk(sort_key), + "output_sort_key must contain primary key" + ); + sort_key.clone() + } + None => compute_sort_key_for_chunks(&pk_schema, chunks.as_ref()), + }; + // Go over overlapped set, build deduplicate plan for each vector of overlapped chunks for overlapped_chunks in self.overlapped_chunks_set.to_vec() { plans.push(Self::build_deduplicate_plan_for_overlapped_chunks( Arc::clone(&table_name), Arc::clone(&output_schema), - overlapped_chunks.to_owned(), + overlapped_chunks, predicate.clone(), - &output_sort_key, + &dedup_sort_key, )?); } @@ -434,9 +443,9 @@ impl Deduplicater { plans.push(Self::build_deduplicate_plan_for_chunk_with_duplicates( Arc::clone(&table_name), Arc::clone(&output_schema), - chunk_with_duplicates.to_owned(), + chunk_with_duplicates, predicate.clone(), - &output_sort_key, + &dedup_sort_key, )?); } @@ -446,7 +455,7 @@ impl Deduplicater { Arc::clone(&output_schema), self.no_duplicates_chunks.to_vec(), predicate, - &output_sort_key, + output_sort_key.as_ref(), )?; plans.append(&mut non_duplicate_plans); } @@ -464,13 +473,13 @@ impl Deduplicater { _ => Arc::new(UnionExec::new(plans)), }; - if sort_output { + if let Some(sort_key) = &output_sort_key { // Sort preserving merge the sorted plans // Note that even if the plan is a single plan (aka no UnionExec on top), // we still need to add this SortPreservingMergeExec because: // 1. It will provide a sorted signal(through Datafusion's Distribution::UnspecifiedDistribution) // 2. And it will not do anything extra if the input is one partition so won't affect performance - let sort_exprs = arrow_sort_key_exprs(&output_sort_key, &plan.schema()); + let sort_exprs = arrow_sort_key_exprs(sort_key, &plan.schema()); plan = Arc::new(SortPreservingMergeExec::new(sort_exprs, plan)); } @@ -554,7 +563,7 @@ impl Deduplicater { output_schema: Arc, chunks: Vec>, // These chunks are identified overlapped predicate: Predicate, - output_sort_key: &SortKey<'_>, + sort_key: &SortKey, ) -> Result> { // Note that we may need to sort/deduplicate based on tag // columns which do not appear in the output @@ -570,14 +579,6 @@ impl Deduplicater { let pk_schema = Self::compute_pk_schema(&chunks); let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); - // Compute the output sort key for these chunks - let sort_key = if !output_sort_key.is_empty() { - output_sort_key.to_owned() - } else { - compute_sort_key_for_chunks(&output_schema, chunks.as_ref()) - }; - trace!(sort_key=?sort_key, "sort key for the input chunks"); - trace!( ?output_schema, ?pk_schema, @@ -594,7 +595,7 @@ impl Deduplicater { Arc::clone(&input_schema), Arc::clone(chunk), predicate.clone(), - &sort_key, + Some(sort_key), ) }) .collect(); @@ -605,7 +606,7 @@ impl Deduplicater { let plan = UnionExec::new(sorted_chunk_plans?); // Now (sort) merge the already sorted chunks - let sort_exprs = arrow_sort_key_exprs(&sort_key, &plan.schema()); + let sort_exprs = arrow_sort_key_exprs(sort_key, &plan.schema()); let plan = Arc::new(SortPreservingMergeExec::new( sort_exprs.clone(), @@ -649,19 +650,13 @@ impl Deduplicater { output_schema: Arc, chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, - output_sort_key: &SortKey<'_>, + sort_key: &SortKey, ) -> Result> { let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); // Compute the output sort key for this chunk let chunks = vec![chunk]; - let mut sort_key = if !output_sort_key.is_empty() { - output_sort_key.to_owned() - } else { - compute_sort_key_for_chunks(&output_schema, &chunks) - }; - trace!(sort_key=?sort_key,chunk_id=?chunks[0].id(), "Computed the sort key for the input chunk"); // Create the 2 bottom nodes IOxReadFilterNode and SortExec let plan = Self::build_sort_plan_for_read_filter( @@ -669,22 +664,12 @@ impl Deduplicater { Arc::clone(&input_schema), Arc::clone(&chunks[0]), predicate, - &sort_key, + Some(sort_key), )?; - // The sort key of this chunk might only the subset of the super sort key - if !output_sort_key.is_empty() { - // First get the chunk pk columns - let schema = chunks[0].schema(); - let key_columns = schema.primary_key(); - - // Now get the key subset of the super key that includes the chunk's pk columns - sort_key = output_sort_key.selected_sort_key(key_columns.clone()); - } - - // Add DeduplicateExc + // Add DeduplicateExec // Sort exprs for the deduplication - let sort_exprs = arrow_sort_key_exprs(&sort_key, &plan.schema()); + let sort_exprs = arrow_sort_key_exprs(sort_key, &plan.schema()); trace!(Sort_Exprs=?sort_exprs, chunk_ID=?chunks[0].id(), "Sort Expression for the deduplicate node of chunk"); let plan = Self::add_deduplicate_node(sort_exprs, plan); @@ -761,6 +746,12 @@ impl Deduplicater { /// See the description of function build_scan_plan to see why the sort may be needed /// ```text /// ┌─────────────────┐ + /// │ ProjectionExec │ + /// │ (optional) │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌─────────────────┐ /// │ SortExec │ /// │ (optional) │ /// └─────────────────┘ @@ -785,32 +776,55 @@ impl Deduplicater { output_schema: Arc, chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, // This is the select predicate of the query - output_sort_key: &SortKey<'_>, + sort_key: Option<&SortKey>, ) -> Result> { // Add columns of sort key and delete predicates in the schema of to-be-scanned IOxReadFilterNode // This is needed because columns in select query may not include them yet - let mut input_schema = Arc::clone(&output_schema); + + // Construct a schema to pass to IOxReadFilterNode that contains: // - // Cols of sort keys (which is a part of the primary key) - if !output_sort_key.is_empty() { - // build schema of PK - let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); - // Merge it in the output_schema - input_schema = Self::compute_input_schema(&input_schema, &pk_schema); + // 1. all columns present in the output schema + // 2. all columns present in the sort key that are present in the chunk + // 3. all columns present in any delete predicates on the chunk + // + // Any columns present in the schema but not in the chunk, will be padded with NULLs + // by IOxReadFilterNode + // + // 1. ensures that the schema post-projection matches output_schema + // 2. ensures that all columns necessary to perform the sort are present + // 3. ensures that all columns necessary to evaluate the delete predicates are present + let mut schema_merger = SchemaMerger::new().merge(&output_schema).unwrap(); + + let chunk_schema = chunk.schema(); + + // Cols of sort key + if let Some(key) = sort_key { + for (t, field) in chunk_schema.iter() { + // Ignore columns present in sort key but not in chunk + if key.get(field.name()).is_some() { + schema_merger.merge_field(field, t).unwrap(); + } + } } - // + // Cols of delete predicates if chunk.has_delete_predicates() { - // build schema of columns in delete expression - let pred_schema = Self::compute_delete_predicate_schema(&[Arc::clone(&chunk)]); - // merge that column into the schema - input_schema = Self::compute_input_schema(&input_schema, &pred_schema); + for col in chunk.delete_predicate_columns() { + let idx = chunk_schema + .find_index_of(col) + .expect("delete predicate missing column"); + + let (t, field) = chunk_schema.field(idx); + schema_merger.merge_field(field, t).unwrap(); + } } + let input_schema = schema_merger.build(); + // Create the bottom node IOxReadFilterNode for this chunk let mut input: Arc = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), - input_schema, + Arc::new(input_schema), vec![Arc::clone(&chunk)], predicate, )); @@ -821,6 +835,7 @@ impl Deduplicater { .iter() .map(|pred| Arc::new(pred.as_ref().clone().into())) .collect(); + debug!(?del_preds, "Chunk delete predicates"); let negated_del_expr_val = Predicate::negated_expr(&del_preds[..]); if let Some(negated_del_expr) = negated_del_expr_val { @@ -837,8 +852,8 @@ impl Deduplicater { } // Add the sort operator, SortExec, if needed - if !output_sort_key.is_empty() { - input = Self::build_sort_plan(chunk, input, output_sort_key)? + if let Some(key) = sort_key { + input = Self::build_sort_plan(chunk, input, key)? } // Add a projection operator to return only schema of the operator above this in the plan @@ -851,7 +866,7 @@ impl Deduplicater { fn build_sort_plan( chunk: Arc, input: Arc, - output_sort_key: &SortKey<'_>, + output_sort_key: &SortKey, ) -> Result> { // output_sort_key cannot be empty if output_sort_key.is_empty() { @@ -863,8 +878,8 @@ impl Deduplicater { // Check to see if the plan is sorted on the subset of the output_sort_key let sort_key = chunk.sort_key(); if let Some(chunk_sort_key) = sort_key { - if let Some(merge_key) = SortKey::try_merge_key(output_sort_key, &chunk_sort_key) { - if merge_key == *output_sort_key { + if let Some(merge_key) = SortKey::try_merge_key(output_sort_key, chunk_sort_key) { + if merge_key == output_sort_key { // the chunk is already sorted on the subset of the o_sort_key, // no need to resort it trace!(ChunkID=?chunk.id(), "Chunk is sorted and no need the sort operator"); @@ -885,24 +900,9 @@ impl Deduplicater { "Chunk is not yet sorted and will get sorted in build_sort_plan"); } - // Build the chunk's sort key that is a subset of the output_sort_key - // - // First get the chunk pk columns - let schema = chunk.schema(); - let key_columns = schema.primary_key(); - - // Now get the key subset of the super key that includes the chunk's pk columns - let chunk_sort_key = output_sort_key.selected_sort_key(key_columns.clone()); - - debug!(chunk_type=?chunk.chunk_type(), - chunk_ID=?chunk.id(), - pk_columns=?key_columns, - sort_key=?chunk_sort_key, - "Chunk is getting sorted"); - // Build arrow sort expression for the chunk sort key let input_schema = input.schema(); - let sort_exprs = arrow_sort_key_exprs(&chunk_sort_key, &input_schema); + let sort_exprs = arrow_sort_key_exprs(output_sort_key, &input_schema); trace!(Sort_Exprs=?sort_exprs, Chunk_ID=?chunk.id(), "Sort Expression for the sort operator of chunk"); @@ -919,15 +919,9 @@ impl Deduplicater { output_schema: Arc, chunk: Arc, // This chunk is identified having no duplicates predicate: Predicate, - output_sort_key: &SortKey<'_>, + sort_key: Option<&SortKey>, ) -> Result> { - Self::build_sort_plan_for_read_filter( - table_name, - output_schema, - chunk, - predicate, - output_sort_key, - ) + Self::build_sort_plan_for_read_filter(table_name, output_schema, chunk, predicate, sort_key) } /// Return either @@ -966,14 +960,13 @@ impl Deduplicater { output_schema: Arc, chunks: Vec>, // These chunks is identified having no duplicates predicate: Predicate, - output_sort_key: &SortKey<'_>, + output_sort_key: Option<&SortKey>, ) -> Result>> { let mut plans: Vec> = vec![]; // Only chunks without delete predicates should be in this one IOxReadFilterNode // if there is no chunk, we still need to return a plan - if (output_sort_key.is_empty() && Self::no_delete_predicates(&chunks)) || chunks.is_empty() - { + if (output_sort_key.is_none() && Self::no_delete_predicates(&chunks)) || chunks.is_empty() { plans.push(Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), output_schema, @@ -1007,35 +1000,24 @@ impl Deduplicater { .all(|chunk| chunk.delete_predicates().is_empty()) } - /// Find the columns needed in a given ColumnType across schemas - /// - /// Note by the time we get down here, we have already checked - /// the chunks for compatible schema, so we use unwrap (perhaps - /// famous last words, but true at time of writing) - fn compute_schema_for_column_type(chunks: &[Arc], col_type: ColumnType) -> Arc { + /// Find the columns needed in chunks' primary keys across schemas + fn compute_pk_schema(chunks: &[Arc]) -> Arc { let mut schema_merger = SchemaMerger::new(); for chunk in chunks { let chunk_schema = chunk.schema(); - let cols = match col_type { - ColumnType::PrimaryKey => chunk_schema.primary_key(), - ColumnType::DeletePredicate => chunk.delete_predicate_columns(), - }; - let chunk_cols_schema = chunk_schema.select_by_names(&cols).unwrap(); - schema_merger = schema_merger.merge(&chunk_cols_schema).unwrap(); + for (column_type, field) in chunk_schema.iter() { + if matches!( + column_type, + Some(InfluxColumnType::Tag | InfluxColumnType::Timestamp) + ) { + schema_merger + .merge_field(field, column_type) + .expect("schema mismatch"); + } + } } - let cols_schema = schema_merger.build(); - Arc::new(cols_schema) - } - - /// Find the columns needed in chunks' delete predicates across schemas - fn compute_delete_predicate_schema(chunks: &[Arc]) -> Arc { - Self::compute_schema_for_column_type(chunks, ColumnType::DeletePredicate) - } - - /// Find the columns needed in chunks' primary keys across schemas - fn compute_pk_schema(chunks: &[Arc]) -> Arc { - Self::compute_schema_for_column_type(chunks, ColumnType::PrimaryKey) + Arc::new(schema_merger.build()) } /// Find columns required to read from each scan: the output columns + the @@ -1149,9 +1131,7 @@ mod test { .with_five_rows_of_data(), ); - let mut sort_key = SortKey::with_capacity(2); - sort_key.with_col("tag1"); - sort_key.with_col(TIME_COLUMN_NAME); + let sort_key = SortKey::from_columns(vec!["tag1", TIME_COLUMN_NAME]); // IOx scan operator let input: Arc = Arc::new(IOxReadFilterNode::new( @@ -1224,11 +1204,7 @@ mod test { .with_five_rows_of_data(), ); - let mut sort_key = SortKey::with_capacity(3); - sort_key.with_col("tag1"); - sort_key.with_col("tag2"); - sort_key.with_col("tag3"); - sort_key.with_col(TIME_COLUMN_NAME); + let sort_key = SortKey::from_columns(vec!["tag1", "tag2", "tag3", TIME_COLUMN_NAME]); // IOx scan operator let input: Arc = Arc::new(IOxReadFilterNode::new( @@ -1301,10 +1277,7 @@ mod test { .with_five_rows_of_data(), ); - let mut sort_key = SortKey::with_capacity(3); - sort_key.with_col("tag1"); - sort_key.with_col("tag2"); - sort_key.with_col(TIME_COLUMN_NAME); + let sort_key = SortKey::from_columns(vec!["tag1", "tag2", TIME_COLUMN_NAME]); // Datafusion schema of the chunk let schema = chunk.schema(); @@ -1314,7 +1287,7 @@ mod test { schema, Arc::clone(&chunk), Predicate::default(), - &sort_key, + Some(&sort_key), ) .unwrap(); let batch = test_collect(sort_plan).await; @@ -1341,26 +1314,9 @@ mod test { let chunk1 = Arc::new( TestChunk::new("t") .with_id(1) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag1", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag2", - Some("AL"), - Some("MA"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), ); @@ -1369,26 +1325,9 @@ mod test { let chunk2 = Arc::new( TestChunk::new("t") .with_id(2) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag1", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag2", - Some("AL"), - Some("MA"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), ); @@ -1416,7 +1355,7 @@ mod test { ]; assert_batches_eq!(&expected, &raw_data(&chunks).await); - let output_sort_key = SortKey::with_capacity(0); + let output_sort_key = SortKey::from_columns(vec!["tag1", "tag2", "time"]); let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( Arc::from("t"), schema, @@ -1450,26 +1389,9 @@ mod test { let chunk1 = Arc::new( TestChunk::new("t") .with_id(1) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag1", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag2", - Some("AL"), - Some("MA"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), ); @@ -1478,26 +1400,9 @@ mod test { let chunk2 = Arc::new( TestChunk::new("t") .with_id(2) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag1", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag2", - Some("AL"), - Some("MA"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), ); @@ -1529,8 +1434,7 @@ mod test { .build() .unwrap(); - // With the provided stats, the computed sort key will be (tag1, tag2, time) - let output_sort_key = SortKey::with_capacity(0); + let output_sort_key = SortKey::from_columns(vec!["tag1", "tag2", "time"]); let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( Arc::from("t"), Arc::new(schema), @@ -1564,26 +1468,9 @@ mod test { let chunk1 = Arc::new( TestChunk::new("t") .with_id(1) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag1", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag2", - Some("AL"), - Some("MA"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), ); @@ -1592,19 +1479,8 @@ mod test { let chunk2 = Arc::new( TestChunk::new("t") .with_id(2) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag1", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag1") .with_i64_field_column("other_field_int") .with_five_rows_of_data(), ); @@ -1613,19 +1489,8 @@ mod test { let chunk3 = Arc::new( TestChunk::new("t") .with_id(3) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag1", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag1") .with_i64_field_column("other_field_int") .with_five_rows_of_data(), ); @@ -1662,8 +1527,7 @@ mod test { .build() .unwrap(); - // With the provided stats, the computed sort key will be (tag2, tag1, time) - let output_sort_key = SortKey::with_capacity(0); + let output_sort_key = SortKey::from_columns(vec!["tag2", "tag1", "time"]); let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( Arc::from("t"), Arc::new(schema), @@ -1701,26 +1565,9 @@ mod test { let chunk1 = Arc::new( TestChunk::new("t") .with_id(1) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag1", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag2", - Some("AL"), - Some("MA"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), ); @@ -1729,26 +1576,9 @@ mod test { let chunk2 = Arc::new( TestChunk::new("t") .with_id(2) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag3", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag1", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag3") + .with_tag_column("tag1") .with_i64_field_column("field_int") .with_five_rows_of_data(), ); @@ -1757,19 +1587,8 @@ mod test { let chunk3 = Arc::new( TestChunk::new("t") .with_id(3) - .with_time_column_with_full_stats( - Some(5), - Some(7000), - 5, - Some(NonZeroU64::new(5).unwrap()), - ) - .with_tag_column_with_full_stats( - "tag3", - Some("AL"), - Some("MT"), - 5, - Some(NonZeroU64::new(3).unwrap()), - ) + .with_time_column() + .with_tag_column("tag3") .with_i64_field_column("field_int") .with_i64_field_column("field_int2") .with_five_rows_of_data(), @@ -1811,7 +1630,7 @@ mod test { ]; assert_batches_eq!(&expected, &raw_data(&chunks).await); - let output_sort_key = SortKey::with_capacity(0); + let output_sort_key = SortKey::from_columns(vec!["tag2", "tag1", "time"]); let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( Arc::from("t"), Arc::new(schema), @@ -1890,7 +1709,7 @@ mod test { let mut deduplicator = Deduplicater::new(); let plan = deduplicator - .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), false) + .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None) .unwrap(); let batch = test_collect(plan).await; // No duplicates so no sort at all. The data will stay in their original order @@ -1947,7 +1766,7 @@ mod test { let mut deduplicator = Deduplicater::new(); let plan = deduplicator - .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), false) + .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None) .unwrap(); let batch = test_collect(plan).await; // Data must be sorted on (tag1, time) and duplicates removed @@ -2026,7 +1845,7 @@ mod test { Arc::new(schema), chunks, Predicate::default(), - false, + None, ) .unwrap(); let batch = test_collect(plan).await; @@ -2121,7 +1940,7 @@ mod test { let mut deduplicator = Deduplicater::new(); let plan = deduplicator - .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), false) + .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None) .unwrap(); let batch = test_collect(plan).await; // Two overlapped chunks will be sort merged on (tag1, time) with duplicates removed @@ -2269,7 +2088,7 @@ mod test { // Create scan plan whose output data is only partially sorted let mut deduplicator = Deduplicater::new(); let plan = deduplicator - .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), false) + .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None) .unwrap(); let batch = test_collect(plan).await; // Final data is partially sorted with duplicates removed. Detailed: @@ -2425,10 +2244,18 @@ mod test { ]; assert_batches_eq!(&expected, &raw_data(&chunks).await); + let sort_key = compute_sort_key_for_chunks(&schema, &chunks); let mut deduplicator = Deduplicater::new(); let plan = deduplicator - .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), true) + .build_scan_plan( + Arc::from("t"), + schema, + chunks, + Predicate::default(), + Some(sort_key), + ) .unwrap(); + let batch = test_collect(plan).await; // Final data must be sorted let expected = vec![ @@ -2454,64 +2281,6 @@ mod test { assert_batches_eq!(&expected, &batch); } - #[tokio::test] - async fn test_sorted_metadata() { - test_helpers::maybe_start_logging(); - let mut key = SortKey::default(); - key.push("time", Default::default()); - - let chunk = Arc::new( - TestChunk::new("t") - .with_id(1) - .with_time_column() - .with_i64_field_column("field_int") - .with_one_row_of_data() - .with_sort_key(&key), - ); - - let schema = chunk.schema(); - assert!(schema.sort_key().is_some()); - - let mut provider = ProviderBuilder::new("t", Arc::clone(&schema)) - .add_no_op_pruner() - .add_chunk(chunk) - .build() - .unwrap(); - - provider.ensure_pk_sort(); - - let plan = provider.scan(&None, &[], None).await.unwrap(); - let batches = test_collect(plan).await; - - for batch in &batches { - // TODO: schema output lacks sort key (#3214) - //assert_eq!(batch.schema(), schema.as_arrow()) - - let schema: Schema = batch.schema().try_into().unwrap(); - for field_idx in 0..schema.len() { - let (influx_column_type, field) = schema.field(field_idx); - assert!( - influx_column_type.is_some(), - "Schema for field {}: {:?}, {:?}", - field_idx, - influx_column_type, - field, - ); - } - } - - assert_batches_eq!( - &[ - "+-----------+-----------------------------+", - "| field_int | time |", - "+-----------+-----------------------------+", - "| 1000 | 1970-01-01T00:00:00.000001Z |", - "+-----------+-----------------------------+", - ], - &batches - ); - } - fn chunk_ids(group: &[Arc]) -> String { let ids = group .iter() diff --git a/query/src/test.rs b/query/src/test.rs index 08b8154f39..7bcef82822 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -235,6 +235,9 @@ pub struct TestChunk { /// Order of this chunk relative to other overlapping chunks. order: ChunkOrder, + + /// The sort key of this chunk + sort_key: Option, } /// Implements a method for adding a column with default stats @@ -310,6 +313,7 @@ impl TestChunk { predicate_match: Default::default(), delete_predicates: Default::default(), order: ChunkOrder::MIN, + sort_key: None, } } @@ -858,13 +862,11 @@ impl TestChunk { } /// Set the sort key for this chunk - pub fn with_sort_key(mut self, sort_key: &SortKey<'_>) -> Self { - let mut merger = SchemaMerger::new(); - merger = merger - .merge(self.schema.as_ref()) - .expect("merging was successful"); - self.schema = Arc::new(merger.build_with_sort_key(sort_key)); - self + pub fn with_sort_key(self, sort_key: SortKey) -> Self { + Self { + sort_key: Some(sort_key), + ..self + } } /// Returns all columns of the table @@ -929,16 +931,6 @@ impl QueryChunk for TestChunk { Ok(stream_from_batches(batches)) } - /// Returns true if data of this chunk is sorted - fn is_sorted_on_pk(&self) -> bool { - false - } - - /// Returns the sort key of the chunk if any - fn sort_key(&self) -> Option> { - None - } - fn chunk_type(&self) -> &str { "Test Chunk" } @@ -999,6 +991,10 @@ impl QueryChunkMeta for TestChunk { Arc::clone(&self.schema) } + fn sort_key(&self) -> Option<&SortKey> { + self.sort_key.as_ref() + } + // return a reference to delete predicates of the chunk fn delete_predicates(&self) -> &[Arc] { let pred = &self.delete_predicates; diff --git a/query/src/util.rs b/query/src/util.rs index a2dcef5e04..886bebc732 100644 --- a/query/src/util.rs +++ b/query/src/util.rs @@ -55,22 +55,23 @@ pub fn arrow_pk_sort_exprs( } pub fn arrow_sort_key_exprs( - sort_key: &SortKey<'_>, + sort_key: &SortKey, input_schema: &ArrowSchema, ) -> Vec { - let mut sort_exprs = vec![]; - for (key, options) in sort_key.iter() { - let expr = physical_col(key, input_schema).expect("sort key column in schema"); - sort_exprs.push(PhysicalSortExpr { - expr, - options: SortOptions { - descending: options.descending, - nulls_first: options.nulls_first, - }, - }); - } - - sort_exprs + sort_key + .iter() + .flat_map(|(key, options)| { + // Skip over missing columns + let expr = physical_col(key, input_schema).ok()?; + Some(PhysicalSortExpr { + expr, + options: SortOptions { + descending: options.descending, + nulls_first: options.nulls_first, + }, + }) + }) + .collect() } /// Build a datafusion physical expression from its logical one diff --git a/query_tests/src/table_schema.rs b/query_tests/src/table_schema.rs index 968daa67e4..71ca414e37 100644 --- a/query_tests/src/table_schema.rs +++ b/query_tests/src/table_schema.rs @@ -17,6 +17,7 @@ async fn run_table_schema_test_case( selection: Selection<'_>, table_name: &str, expected_schema: Schema, + expected_sort_key: Option<&SortKey>, ) where D: DbSetup, { @@ -48,6 +49,8 @@ async fn run_table_schema_test_case( expected_schema, actual_schema ); + + assert_eq!(chunk.sort_key(), expected_sort_key); } } assert!( @@ -59,7 +62,7 @@ async fn run_table_schema_test_case( #[tokio::test] async fn list_schema_cpu_all_mub() { - // we expect columns to come out in lexographic order by name + // we expect columns to come out in lexicographic order by name let expected_schema = SchemaBuilder::new() .tag("region") .timestamp() @@ -72,23 +75,22 @@ async fn list_schema_cpu_all_mub() { Selection::All, "cpu", expected_schema, + None, ) .await; } #[tokio::test] async fn list_schema_cpu_all_rub() { - // we expect columns to come out in lexographic order by name + // we expect columns to come out in lexicographic order by name // The schema of RUB includes sort key - let mut sort_key = SortKey::with_capacity(2); - sort_key.push("region", Default::default()); - sort_key.push(TIME_COLUMN_NAME, Default::default()); + let sort_key = SortKey::from_columns(vec!["region", TIME_COLUMN_NAME]); let expected_schema = SchemaBuilder::new() .tag("region") .timestamp() .field("user", DataType::Float64) - .build_with_sort_key(&sort_key) + .build() .unwrap(); run_table_schema_test_case( @@ -96,6 +98,7 @@ async fn list_schema_cpu_all_rub() { Selection::All, "cpu", expected_schema, + Some(&sort_key), ) .await; } @@ -103,15 +106,13 @@ async fn list_schema_cpu_all_rub() { #[tokio::test] async fn list_schema_cpu_all_rub_set_sort_key() { // The schema of RUB includes sort key - let mut sort_key = SortKey::with_capacity(2); - sort_key.push("region", Default::default()); - sort_key.push(TIME_COLUMN_NAME, Default::default()); + let sort_key = SortKey::from_columns(vec!["region", TIME_COLUMN_NAME]); let expected_schema = SchemaBuilder::new() .tag("region") .timestamp() .field("user", DataType::Float64) - .build_with_sort_key(&sort_key) + .build() .unwrap(); run_table_schema_test_case( @@ -119,6 +120,7 @@ async fn list_schema_cpu_all_rub_set_sort_key() { Selection::All, "cpu", expected_schema, + Some(&sort_key), ) .await; @@ -127,7 +129,7 @@ async fn list_schema_cpu_all_rub_set_sort_key() { #[tokio::test] async fn list_schema_disk_all() { - // we expect columns to come out in lexographic order by name + // we expect columns to come out in lexicographic order by name let expected_schema = SchemaBuilder::new() .field("bytes", DataType::Int64) .tag("region") @@ -140,6 +142,7 @@ async fn list_schema_disk_all() { Selection::All, "disk", expected_schema, + None, ) .await; } @@ -160,20 +163,21 @@ async fn list_schema_cpu_selection() { selection, "cpu", expected_schema, + None, ) .await; } #[tokio::test] async fn list_schema_disk_selection() { - // we expect columns to come out in lexographic order by name + // we expect columns to come out in lexicographic order by name let expected_schema = SchemaBuilder::new() .timestamp() .field("bytes", DataType::Int64) .build() .unwrap(); - // Pick an order that is not lexographic + // Pick an order that is not lexicographic let selection = Selection::Some(&["time", "bytes"]); run_table_schema_test_case( @@ -181,13 +185,14 @@ async fn list_schema_disk_selection() { selection, "disk", expected_schema, + None, ) .await; } #[tokio::test] async fn list_schema_location_all() { - // we expect columns to come out in lexographic order by name + // we expect columns to come out in lexicographic order by name let expected_schema = SchemaBuilder::new() .field("count", DataType::UInt64) .timestamp() @@ -200,197 +205,7 @@ async fn list_schema_location_all() { Selection::All, "restaurant", expected_schema, + None, ) .await; } - -#[tokio::test] -async fn test_set_sort_key_valid_same_order() { - // Build the expected schema with sort key - let mut sort_key = SortKey::with_capacity(3); - sort_key.push("tag1", Default::default()); - sort_key.push("time", Default::default()); - sort_key.push("tag2", Default::default()); - - let expected_schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build_with_sort_key(&sort_key) - .unwrap(); - - // The same schema without sort key - let mut schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build() - .unwrap(); - - schema.set_sort_key(&sort_key); - - assert_eq!( - expected_schema, schema, - "Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n", - expected_schema, schema - ); -} - -#[tokio::test] -async fn test_set_sort_key_valid_different_order() { - // Build the expected schema with sort key "time, tag2, tag1" - let mut sort_key = SortKey::with_capacity(3); - sort_key.push("time", Default::default()); - sort_key.push("tag2", Default::default()); - sort_key.push("tag1", Default::default()); - - let expected_schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build_with_sort_key(&sort_key) - .unwrap(); - - // The same schema without sort key - let mut schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build() - .unwrap(); - - schema.set_sort_key(&sort_key); - - assert_eq!( - expected_schema, schema, - "Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n", - expected_schema, schema - ); -} - -#[tokio::test] -async fn test_set_sort_key_valid_subset() { - // Build the expected schema with sort key "time, tag1" - let mut sort_key = SortKey::with_capacity(2); - sort_key.push("time", Default::default()); - sort_key.push("tag1", Default::default()); - - let expected_schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build_with_sort_key(&sort_key) - .unwrap(); - - // The same schema without sort key - let mut schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build() - .unwrap(); - - // set sort key for it - schema.set_sort_key(&sort_key); - - assert_eq!( - expected_schema, schema, - "Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n", - expected_schema, schema - ); -} - -#[tokio::test] -async fn test_set_sort_key_valid_subset_of_fully_set() { - // Build sort key "tag1, time, tag2" - let mut sort_key = SortKey::with_capacity(3); - sort_key.push("tag1", Default::default()); - sort_key.push("time", Default::default()); - sort_key.push("tag2", Default::default()); - - // The schema with sort key - let mut schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build_with_sort_key(&sort_key) - .unwrap(); - - // reset sort key to "tag2, time" - let mut sort_key = SortKey::with_capacity(2); - sort_key.push("tag2", Default::default()); - sort_key.push("time", Default::default()); - - schema.set_sort_key(&sort_key); - - // Expected schema with "tag2, time" as sort key - let expected_schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build_with_sort_key(&sort_key) - .unwrap(); - - assert_eq!( - expected_schema, schema, - "Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n", - expected_schema, schema - ); -} - -#[tokio::test] -async fn test_set_sort_key_invalid_not_exist() { - // Build the expected schema with sort key "time" - let mut sort_key = SortKey::with_capacity(1); - sort_key.push("time", Default::default()); - - let expected_schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build_with_sort_key(&sort_key) - .unwrap(); - - // The same schema without sort key - let mut schema = SchemaBuilder::new() - .tag("tag1") - .timestamp() - .tag("tag2") - .field("field_int", DataType::Int64) - .field("field_float", DataType::Float64) - .build() - .unwrap(); - - // Nuild sort key that include valid "time" and invalid "no_tag" - let mut sort_key = SortKey::with_capacity(2); - sort_key.push("time", Default::default()); - // invalid column - sort_key.push("not_tag", Default::default()); - - // The invalid key will be ignored in this function - schema.set_sort_key(&sort_key); - - assert_eq!( - expected_schema, schema, - "Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n", - expected_schema, schema - ); -} diff --git a/schema/src/builder.rs b/schema/src/builder.rs index 79f6d4ca86..3821feeb42 100644 --- a/schema/src/builder.rs +++ b/schema/src/builder.rs @@ -3,8 +3,6 @@ use std::convert::TryInto; use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField}; use snafu::{ResultExt, Snafu}; -use crate::sort::SortKey; - use super::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME}; /// Database schema creation / validation errors. @@ -145,20 +143,11 @@ impl SchemaBuilder { /// assert_eq!(influxdb_column_type, Some(InfluxColumnType::Timestamp)); /// ``` pub fn build(&mut self) -> Result { - self.build_with_sort_key(&Default::default()) - } - - pub fn build_with_sort_key(&mut self, sort_key: &SortKey<'_>) -> Result { assert!(!self.finished, "build called multiple times"); self.finished = true; - Schema::new_from_parts( - self.measurement.take(), - self.fields.drain(..), - sort_key, - false, - ) - .context(ValidatingSchemaSnafu) + Schema::new_from_parts(self.measurement.take(), self.fields.drain(..), false) + .context(ValidatingSchemaSnafu) } /// Internal helper method to add a column definition diff --git a/schema/src/lib.rs b/schema/src/lib.rs index 9a035cdc05..642297047b 100644 --- a/schema/src/lib.rs +++ b/schema/src/lib.rs @@ -13,11 +13,10 @@ use arrow::datatypes::{ }; use hashbrown::HashSet; +use crate::sort::SortKey; use selection::Selection; use snafu::{OptionExt, Snafu}; -use crate::sort::{ColumnSort, SortKey}; - /// The name of the timestamp column in the InfluxDB datamodel pub const TIME_COLUMN_NAME: &str = "time"; @@ -111,7 +110,6 @@ impl TryFrom for Schema { const MEASUREMENT_METADATA_KEY: &str = "iox::measurement::name"; const COLUMN_METADATA_KEY: &str = "iox::column::type"; -const COLUMN_SORT_METADATA_KEY: &str = "iox::column::sort"; impl Schema { /// Create a new Schema wrapper over the schema @@ -161,7 +159,6 @@ impl Schema { pub(crate) fn new_from_parts( measurement: Option, fields: impl Iterator)>, - sort_key: &SortKey<'_>, sort_columns: bool, ) -> Result { let mut metadata = HashMap::new(); @@ -170,17 +167,9 @@ impl Schema { metadata.insert(MEASUREMENT_METADATA_KEY.to_string(), measurement); } - let mut sort_ordinals = Vec::with_capacity(sort_key.len()); - let mut fields: Vec = fields .map(|(mut field, column_type)| { - match sort_key.get(field.name()) { - Some(sort) => { - sort_ordinals.push(sort.sort_ordinal); - set_field_metadata(&mut field, column_type, Some(sort)) - } - None => set_field_metadata(&mut field, column_type, None), - } + set_field_metadata(&mut field, column_type); field }) .collect(); @@ -194,77 +183,14 @@ impl Schema { let record = ArrowSchemaRef::new(ArrowSchema::new_with_metadata(fields, metadata)).try_into()?; - // This must be after validation in case of duplicate columns - sort_ordinals.sort_unstable(); - - for (idx, ordinal) in sort_ordinals.iter().enumerate() { - if idx != *ordinal { - return Err(Error::SortColumnNotFound { - column_name: sort_key.get_index(idx).unwrap().0.to_string(), - }); - } - } - - if sort_ordinals.len() != sort_key.len() { - return Err(Error::SortColumnNotFound { - column_name: sort_key - .get_index(sort_ordinals.len()) - .unwrap() - .0 - .to_string(), - }); - } - Ok(record) } - /// Set the order of sort columns to the specified `sort_key` - pub fn set_sort_key(&mut self, sort_key: &SortKey<'_>) { - let fields = self.inner.fields(); - - // create a new_fields that are the fields with their sort keys set - let new_fields = fields + /// Returns true if the sort_key includes all primary key cols + pub fn is_sorted_on_pk(&self, sort_key: &SortKey) -> bool { + self.primary_key() .iter() - .map(|field| { - let mut new_field = field.clone(); - let mut meta = std::collections::BTreeMap::new(); - if let Some(sort) = sort_key.get(field.name()) { - // New sort key - meta.insert(COLUMN_SORT_METADATA_KEY.to_string(), sort.to_string()); - } - // Keep other meta data - if let Some(metadata) = field.metadata() { - for (key, value) in metadata { - if key.ne(&COLUMN_SORT_METADATA_KEY.to_string()) { - meta.insert(key.clone(), value.clone()); - } - } - } - new_field.set_metadata(Some(meta)); - - new_field - }) - .collect(); - - let new_meta = self.inner.metadata().clone(); - let new_schema = ArrowSchema::new_with_metadata(new_fields, new_meta); - self.inner = Arc::new(new_schema); - } - - /// Returns true of the sort_key include all primary key cols - pub fn is_sorted_on_pk(&self) -> bool { - if let Some(sort_key) = self.sort_key() { - let key_columns = self.primary_key(); - for key_col in key_columns { - if sort_key.get(key_col).is_none() { - return false; // pk col is not part of the sort key - } - } - true - } else { - // not sorted yet - false - } + .all(|col| sort_key.get(*col).is_some()) } /// Provide a reference to the underlying Arrow Schema object @@ -425,36 +351,6 @@ impl Schema { self.select(Selection::Some(selection)) } - /// Returns the sort key if any - pub fn sort_key(&self) -> Option> { - // Find all the sorted columns - let mut columns: Vec<_> = self - .inner - .fields() - .iter() - .enumerate() - .flat_map(|(idx, field)| Some((idx, get_sort(field)?))) - .collect(); - - columns.sort_unstable_by_key(|(_, sort)| sort.sort_ordinal); - - let mut sort_key = SortKey::with_capacity(columns.len()); - for (idx, (column_idx, sort)) in columns.into_iter().enumerate() { - // If the schema has been projected with only some of the columns - // the sort key may be truncated - if sort.sort_ordinal != idx { - break; - } - - sort_key.push(self.inner.field(column_idx).name().as_str(), sort.options) - } - - if !sort_key.is_empty() { - return Some(sort_key); - } - None - } - /// Return columns used for the "primary key" in this table. /// /// Currently this relies on the InfluxDB data model annotations @@ -501,32 +397,14 @@ pub(crate) fn get_influx_type(field: &ArrowField) -> Option { .ok() } -/// Gets the column sort for a field -pub(crate) fn get_sort(field: &ArrowField) -> Option { - field - .metadata() - .as_ref()? - .get(COLUMN_SORT_METADATA_KEY)? - .parse() - .ok() -} - /// Sets the metadata for a field - replacing any existing metadata -pub(crate) fn set_field_metadata( - field: &mut ArrowField, - column_type: Option, - sort: Option, -) { +pub(crate) fn set_field_metadata(field: &mut ArrowField, column_type: Option) { let mut metadata = std::collections::BTreeMap::new(); if let Some(column_type) = column_type { metadata.insert(COLUMN_METADATA_KEY.to_string(), column_type.to_string()); } - if let Some(sort) = sort { - metadata.insert(COLUMN_SORT_METADATA_KEY.to_string(), sort.to_string()); - } - field.set_metadata(Some(metadata)) } @@ -732,13 +610,9 @@ macro_rules! assert_column_eq { #[cfg(test)] mod test { - use arrow::compute::SortOptions; - use InfluxColumnType::*; use InfluxFieldType::*; - use crate::merge::SchemaMerger; - use super::{builder::SchemaBuilder, *}; fn make_field( @@ -1158,115 +1032,11 @@ mod test { ); } - #[test] - fn test_sort() { - let mut sort_key = SortKey::with_capacity(3); - sort_key.push("tag4", Default::default()); - sort_key.push("tag3", Default::default()); - sort_key.push("tag2", Default::default()); - sort_key.push("tag1", Default::default()); - sort_key.push(TIME_COLUMN_NAME, Default::default()); - - let schema1 = SchemaBuilder::new() - .influx_field("the_field", String) - .tag("tag1") - .tag("tag2") - .tag("tag3") - .tag("tag4") - .timestamp() - .measurement("the_measurement") - .build_with_sort_key(&sort_key) - .unwrap(); - - let projected = schema1 - .select_by_names(&["tag4", "tag2", "tag3", "time"]) - .unwrap(); - - let projected_key: Vec<_> = projected.sort_key().unwrap().iter().map(|x| *x.0).collect(); - - let m1 = SchemaMerger::new().merge(&schema1).unwrap().build(); - - let m2 = SchemaMerger::new() - .merge(&schema1) - .unwrap() - .build_with_sort_key(&sort_key); - - assert_eq!(schema1.sort_key().unwrap(), sort_key); - assert_eq!(m1.sort_key(), None); - assert_eq!(m2.sort_key().unwrap(), sort_key); - assert_eq!(projected_key, vec!["tag4", "tag3", "tag2"]) - } - - #[test] - fn test_sort_missing_column() { - let mut sort_key = SortKey::with_capacity(3); - sort_key.push( - "the_field", - SortOptions { - descending: true, - nulls_first: false, - }, - ); - sort_key.push("a", Default::default()); - sort_key.push(TIME_COLUMN_NAME, Default::default()); - - // Verify missing columns are detected - let err = SchemaBuilder::new() - .influx_field("the_field", String) - .measurement("the_measurement") - .build_with_sort_key(&sort_key) - .unwrap_err(); - - assert!(matches!( - err, - builder::Error::ValidatingSchema { - source: Error::SortColumnNotFound { - column_name - } - } if &column_name == "a" - )); - - // Verify duplicate columns don't break truncation - let err = SchemaBuilder::new() - .influx_field("the_field", String) - .influx_field("a", String) - .timestamp() - .timestamp() - .measurement("the_measurement") - .build_with_sort_key(&sort_key) - .unwrap_err(); - - assert!(matches!( - err, - builder::Error::ValidatingSchema { - source: Error::DuplicateColumnName { .. } - } - )); - - // Verify sort key gaps are detected - let err = SchemaBuilder::new() - .influx_field("a", String) - .influx_field("the_field", String) - .measurement("the_measurement") - .build_with_sort_key(&sort_key) - .unwrap_err(); - - assert!(matches!(err, builder::Error::ValidatingSchema { - source: Error::SortColumnNotFound { - column_name - } - } if &column_name == "time" )); - } - #[test] fn test_is_sort_on_pk() { // Sort key the same as pk - let mut sort_key = SortKey::with_capacity(3); - sort_key.with_col("tag4"); - sort_key.with_col("tag3"); - sort_key.with_col("tag2"); - sort_key.with_col("tag1"); - sort_key.with_col(TIME_COLUMN_NAME); + let sort_key = + SortKey::from_columns(vec!["tag4", "tag3", "tag2", "tag1", TIME_COLUMN_NAME]); let schema = SchemaBuilder::new() .influx_field("the_field", String) @@ -1276,29 +1046,13 @@ mod test { .tag("tag4") .timestamp() .measurement("the_measurement") - .build_with_sort_key(&sort_key) + .build() .unwrap(); - assert!(schema.is_sorted_on_pk()); + assert!(schema.is_sorted_on_pk(&sort_key)); // Sort key does not include all pk cols - let mut sort_key = SortKey::with_capacity(3); - sort_key.with_col("tag3"); - sort_key.with_col("tag1"); - sort_key.with_col(TIME_COLUMN_NAME); + let sort_key = SortKey::from_columns(vec!["tag3", "tag1", TIME_COLUMN_NAME]); - let schema = SchemaBuilder::new() - .influx_field("the_field", String) - .tag("tag1") - .tag("tag2") - .tag("tag3") - .tag("tag4") - .timestamp() - .measurement("the_measurement") - .build_with_sort_key(&sort_key) - .unwrap(); - assert!(!schema.is_sorted_on_pk()); - - // No sort key let schema = SchemaBuilder::new() .influx_field("the_field", String) .tag("tag1") @@ -1309,19 +1063,10 @@ mod test { .measurement("the_measurement") .build() .unwrap(); - assert!(!schema.is_sorted_on_pk()); - - // No PK, no sort key - let schema = SchemaBuilder::new() - .influx_field("the_field", String) - .measurement("the_measurement") - .build() - .unwrap(); - assert!(!schema.is_sorted_on_pk()); + assert!(!schema.is_sorted_on_pk(&sort_key)); // No PK, sort key on non pk - let mut sort_key = SortKey::with_capacity(3); - sort_key.with_col("the_field"); + let sort_key = SortKey::from_columns(vec!["the_field"]); let schema = SchemaBuilder::new() .influx_field("the_field", String) @@ -1331,8 +1076,8 @@ mod test { .tag("tag4") .timestamp() .measurement("the_measurement") - .build_with_sort_key(&sort_key) + .build() .unwrap(); - assert!(!schema.is_sorted_on_pk()); + assert!(!schema.is_sorted_on_pk(&sort_key)); } } diff --git a/schema/src/merge.rs b/schema/src/merge.rs index aa7b85443b..b87446973b 100644 --- a/schema/src/merge.rs +++ b/schema/src/merge.rs @@ -8,8 +8,6 @@ use hashbrown::hash_map::RawEntryMut; use hashbrown::HashMap; use snafu::Snafu; -use crate::sort::SortKey; - use super::{InfluxColumnType, Schema}; /// Database schema creation / validation errors. @@ -137,7 +135,7 @@ impl SchemaMerger { Ok(self) } - fn merge_field( + pub fn merge_field( &mut self, field: &Field, column_type: Option, @@ -187,18 +185,10 @@ impl SchemaMerger { } /// Returns the schema that was built, the columns are always sorted in lexicographic order - pub fn build(self) -> Schema { - self.build_with_sort_key(&Default::default()) - } - - /// Returns the schema that was built, the columns are always sorted in lexicographic order - /// - /// Additionally specifies a sort key for the data - pub fn build_with_sort_key(mut self, sort_key: &SortKey<'_>) -> Schema { + pub fn build(mut self) -> Schema { Schema::new_from_parts( self.measurement.take(), self.fields.drain().map(|x| x.1), - sort_key, true, ) .expect("failed to build merged schema") diff --git a/schema/src/sort.rs b/schema/src/sort.rs index 0612b11dbc..afab6996c8 100644 --- a/schema/src/sort.rs +++ b/schema/src/sort.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{fmt::Display, str::FromStr}; use arrow::compute::SortOptions; @@ -68,22 +69,79 @@ impl std::fmt::Display for ColumnSort { } } -#[derive(Debug, Default, Eq, PartialEq, Clone)] -pub struct SortKey<'a> { - columns: IndexMap<&'a str, SortOptions>, +#[derive(Debug, Default)] +pub struct SortKeyBuilder { + columns: IndexMap, SortOptions>, } -impl<'a> SortKey<'a> { - /// Create a new empty sort key that can store `capacity` columns without allocating +impl SortKeyBuilder { + pub fn new() -> Self { + Self::default() + } + pub fn with_capacity(capacity: usize) -> Self { Self { columns: IndexMap::with_capacity(capacity), } } - /// Adds a new column to the end of this sort key - pub fn push(&mut self, column: &'a str, options: SortOptions) { - self.columns.insert(column, options); + pub fn with_col(self, column: impl Into>) -> Self { + self.with_col_sort_opts(column, Default::default()) + } + + /// Helper to insert col with specified sort options into sort key + pub fn with_col_opts( + self, + col: impl Into>, + descending: bool, + nulls_first: bool, + ) -> Self { + self.with_col_sort_opts( + col, + SortOptions { + descending, + nulls_first, + }, + ) + } + + pub fn with_col_sort_opts(mut self, col: impl Into>, options: SortOptions) -> Self { + self.columns.insert(col.into(), options); + self + } + + pub fn build(self) -> SortKey { + SortKey { + columns: Arc::new(self.columns), + } + } +} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct SortKey { + columns: Arc, SortOptions>>, +} + +impl SortKey { + /// Create a new empty sort key + pub fn empty() -> Self { + SortKey { + columns: Default::default(), + } + } + + /// Create a new sort key from the provided columns + pub fn from_columns(columns: C) -> Self + where + C: IntoIterator, + I: Into>, + { + let iter = columns.into_iter(); + let mut builder = SortKeyBuilder::with_capacity(iter.size_hint().0); + for c in iter { + builder = builder.with_col(c); + } + builder.build() } /// Gets the ColumnSort for a given column name @@ -96,10 +154,8 @@ impl<'a> SortKey<'a> { } /// Gets the column for a given index - pub fn get_index(&self, idx: usize) -> Option<(&'a str, SortOptions)> { - self.columns - .get_index(idx) - .map(|(col, options)| (*col, *options)) + pub fn get_index(&self, idx: usize) -> Option<(&Arc, &SortOptions)> { + self.columns.get_index(idx) } /// Return the index of the given column and its sort option. Return None otherwise. @@ -116,7 +172,7 @@ impl<'a> SortKey<'a> { } /// Returns an iterator over the columns in this key - pub fn iter(&self) -> Iter<'_, &'a str, SortOptions> { + pub fn iter(&self) -> Iter<'_, Arc, SortOptions> { self.columns.iter() } @@ -130,23 +186,6 @@ impl<'a> SortKey<'a> { self.columns.is_empty() } - /// Returns a subset of the sort key that includes only the given columns - pub fn selected_sort_key(&self, select_keys: Vec<&str>) -> SortKey<'a> { - let keys: IndexMap<&'a str, SortOptions> = self - .columns - .iter() - .filter_map(|(col, options)| { - if select_keys.iter().any(|key| key == col) { - Some((*col, *options)) - } else { - None - } - }) - .collect(); - - SortKey { columns: keys } - } - /// Returns merge key of the 2 given keys if one covers the other. Returns None otherwise. /// Key1 is said to cover key2 if key2 is a subset and in the same order of key1. /// Examples: @@ -165,28 +204,19 @@ impl<'a> SortKey<'a> { /// super key of (a, b, c) and any of { b, a), (c, a), (c, b), (b, a, c), (b, c, a), (c, a, b), (c, b, a) } is None /// /// Note that the last column in the sort key must be time - pub fn try_merge_key(key1: &SortKey<'a>, key2: &SortKey<'a>) -> Option> { + pub fn try_merge_key<'a>(key1: &'a SortKey, key2: &'a SortKey) -> Option<&'a SortKey> { if key1.is_empty() || key2.is_empty() { panic!("Sort key cannot be empty"); } - let key1 = key1.clone(); - let key2 = key2.clone(); - // Verify if time column in the sort key - match key1.columns.get_index_of(TIME_COLUMN_NAME) { - None => panic!("Time column is not included in the sort key {:#?}", key1), - Some(idx) => { - if idx < key1.len() - 1 { - panic!("Time column is not last in the sort key {:#?}", key1) - } - } - } - match key2.columns.get_index_of(TIME_COLUMN_NAME) { - None => panic!("Time column is not included in the sort key {:#?}", key2), - Some(idx) => { - if idx < key2.len() - 1 { - panic!("Time column is not last in the sort key {:#?}", key2) + for key in [&key1, &key2] { + match key.columns.get_index_of(TIME_COLUMN_NAME) { + None => panic!("Time column is not included in the sort key {:#?}", key), + Some(idx) => { + if idx < key.len() - 1 { + panic!("Time column is not last in the sort key {:#?}", key) + } } } } @@ -199,7 +229,7 @@ impl<'a> SortKey<'a> { // Go over short key and check its right-order availability in the long key let mut prev_long_idx: Option = None; - for (col, sort_options) in &short_key.columns { + for (col, sort_options) in short_key.columns.iter() { if let Some(long_idx) = long_key.find_index(col, sort_options) { match prev_long_idx { None => prev_long_idx = Some(long_idx), @@ -222,29 +252,13 @@ impl<'a> SortKey<'a> { // Reach here means the long key is the super key of the sort one Some(long_key) } - - /// Helper to insert col with default sort options into sort key - pub fn with_col(&mut self, col: &'a str) { - self.push(col, Default::default()); - } - - /// Helper to insert col with specified sort options into sort key - pub fn with_col_opts(&mut self, col: &'a str, descending: bool, nulls_first: bool) { - self.push( - col, - SortOptions { - descending, - nulls_first, - }, - ); - } } // Produces a human-readable representation of a sort key that looks like: // // "host, region DESC, env NULLS FIRST, time" // -impl<'a> Display for SortKey<'a> { +impl Display for SortKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { for (i, (name, options)) in self.columns.iter().enumerate() { write!(f, "{}", name)?; @@ -305,10 +319,7 @@ mod tests { #[test] fn test_basic() { - let mut key = SortKey::with_capacity(3); - key.push("a", Default::default()); - key.push("c", Default::default()); - key.push("b", Default::default()); + let key = SortKey::from_columns(vec!["a", "c", "b"]); assert_eq!(key.len(), 3); assert!(!key.is_empty()); @@ -339,48 +350,50 @@ mod tests { #[test] fn test_sort_key_eq() { - let mut key1 = SortKey::with_capacity(1); - key1.with_col("a"); + let key1 = SortKey::from_columns(vec!["a"]); - let mut key1_2 = SortKey::with_capacity(2); - key1_2.with_col("a"); - key1_2.with_col_opts("b", true, false); + let key1_2 = SortKeyBuilder::with_capacity(2) + .with_col("a") + .with_col_opts("b", true, false) + .build(); - let key2 = SortKey::with_capacity(2); + let key2 = SortKey::empty(); // different keys assert_ne!(key1, key2); assert_ne!(key1_2, key2); assert_ne!(key1, key1_2); - let mut key3 = SortKey::with_capacity(1); - key3.with_col("a"); + let key3 = SortKey::from_columns(vec!["a"]); - let mut key3_2 = SortKey::with_capacity(2); - key3_2.with_col("a"); - key3_2.with_col_opts("b", true, false); + let key3_2 = SortKeyBuilder::with_capacity(2) + .with_col("a") + .with_col_opts("b", true, false) + .build(); // same assert_eq!(key1, key3); assert_eq!(key1_2, key3_2); - let mut key4 = SortKey::with_capacity(1); - key4.with_col("aa"); + let key4 = SortKey::from_columns(vec!["aa"]); - let mut key4_2 = SortKey::with_capacity(2); - key4_2.with_col("aa"); - key4_2.with_col_opts("bb", true, false); + let key4_2 = SortKeyBuilder::with_capacity(2) + .with_col("aa") + .with_col_opts("bb", true, false) + .build(); // different key, same value assert_ne!(key1, key4); assert_ne!(key1_2, key4_2); - let mut key5 = SortKey::with_capacity(1); - key5.with_col_opts("a", true, true); + let key5 = SortKeyBuilder::with_capacity(1) + .with_col_opts("a", true, true) + .build(); - let mut key5_2 = SortKey::with_capacity(2); - key5_2.with_col_opts("a", true, true); - key5_2.with_col_opts("b", false, true); + let key5_2 = SortKeyBuilder::with_capacity(2) + .with_col_opts("a", true, true) + .with_col_opts("b", false, true) + .build(); // same key, different value assert_ne!(key1, key5); @@ -390,39 +403,41 @@ mod tests { // Note that the last column must be TIME_COLUMN_NAME to avoid panicking #[test] fn test_super_sort_key() { - // key (a) with default sort options (false, true) - let mut key_a = SortKey::with_capacity(1); let a = TIME_COLUMN_NAME; - key_a.with_col(a); + // key (a) with default sort options (false, true) + let key_a = SortKey::from_columns(vec![a]); + // key (a) with explicitly defined sort options - let mut key_a_2 = SortKey::with_capacity(1); - key_a_2.with_col_opts(a, true, false); + let key_a_2 = SortKeyBuilder::with_capacity(1) + .with_col_opts(TIME_COLUMN_NAME, true, false) + .build(); // super key of (a) and (a) is (a) let merge_key = SortKey::try_merge_key(&key_a, &key_a).unwrap(); - assert_eq!(merge_key, key_a); + assert_eq!(merge_key, &key_a); let merge_key = SortKey::try_merge_key(&key_a_2, &key_a_2).unwrap(); - assert_eq!(merge_key, key_a_2); + assert_eq!(merge_key, &key_a_2); // (a,b) let b = TIME_COLUMN_NAME; - let mut key_ab = SortKey::with_capacity(2); - key_ab.with_col("a"); - key_ab.with_col(b); - let mut key_ab_2 = SortKey::with_capacity(2); - key_ab_2.with_col_opts("a", true, false); - key_ab_2.with_col_opts(b, false, false); + let key_ab = SortKey::from_columns(vec!["a", TIME_COLUMN_NAME]); + let key_ab_2 = SortKeyBuilder::with_capacity(2) + .with_col_opts("a", true, false) + .with_col_opts(b, false, false) + .build(); + //(b) - let mut key_b = SortKey::with_capacity(1); - key_b.with_col(b); - let mut key_b_2 = SortKey::with_capacity(1); - key_b_2.with_col_opts(b, false, false); + let key_b = SortKey::from_columns(vec![b]); + + let key_b_2 = SortKeyBuilder::with_capacity(1) + .with_col_opts(b, false, false) + .build(); // super key of (a, b) and (b) is (a, b) let merge_key = SortKey::try_merge_key(&key_ab, &key_b).unwrap(); - assert_eq!(merge_key, key_ab); + assert_eq!(merge_key, &key_ab); let merge_key = SortKey::try_merge_key(&key_ab_2, &key_b_2).unwrap(); - assert_eq!(merge_key, key_ab_2); + assert_eq!(merge_key, &key_ab_2); // super key of (a, b) and (b') is None let merge_key = SortKey::try_merge_key(&key_ab, &key_b_2); assert_eq!(merge_key, None); @@ -431,9 +446,9 @@ mod tests { // super key of (a, b) and (a, b) is (a, b) let merge_key = SortKey::try_merge_key(&key_ab, &key_ab).unwrap(); - assert_eq!(merge_key, key_ab); + assert_eq!(merge_key, &key_ab); let merge_key = SortKey::try_merge_key(&key_ab_2, &key_ab_2).unwrap(); - assert_eq!(merge_key, key_ab_2); + assert_eq!(merge_key, &key_ab_2); // super key of (a, b) and (a',b') is None let merge_key = SortKey::try_merge_key(&key_ab, &key_ab_2); assert_eq!(merge_key, None); @@ -442,103 +457,46 @@ mod tests { // (a, b, c) let c = TIME_COLUMN_NAME; - let mut key_abc_2 = SortKey::with_capacity(3); - key_abc_2.with_col_opts("a", true, false); - key_abc_2.with_col_opts("b", false, false); - key_abc_2.with_col_opts(c, true, true); + let key_abc_2 = SortKeyBuilder::with_capacity(3) + .with_col_opts("a", true, false) + .with_col_opts("b", false, false) + .with_col_opts(c, true, true) + .build(); // (c) - let mut key_c_2 = SortKey::with_capacity(1); - key_c_2.with_col_opts(c, true, true); + let key_c_2 = SortKeyBuilder::with_capacity(1) + .with_col_opts(c, true, true) + .build(); // (a, c) - let mut key_ac_2 = SortKey::with_capacity(2); - key_ac_2.with_col_opts("a", true, false); - key_ac_2.with_col_opts(c, true, true); + let key_ac_2 = SortKeyBuilder::with_capacity(2) + .with_col_opts("a", true, false) + .with_col_opts(c, true, true) + .build(); // (b,c) - let mut key_bc_2 = SortKey::with_capacity(2); - key_bc_2.with_col_opts("b", false, false); - key_bc_2.with_col_opts(c, true, true); + let key_bc_2 = SortKeyBuilder::with_capacity(2) + .with_col_opts("b", false, false) + .with_col_opts(c, true, true) + .build(); // (b,a,c) - let mut key_bac_2 = SortKey::with_capacity(3); - key_bac_2.with_col_opts("b", false, false); - key_bac_2.with_col_opts("a", true, false); - key_bac_2.with_col_opts(c, true, true); + let key_bac_2 = SortKeyBuilder::with_capacity(3) + .with_col_opts("b", false, false) + .with_col_opts("a", true, false) + .with_col_opts(c, true, true) + .build(); // super key of (a, b, c) and any of { (a, c), (b, c), (a), (b), (c) } is (a, b, c) let merge_key = SortKey::try_merge_key(&key_abc_2, &key_c_2).unwrap(); - assert_eq!(merge_key, key_abc_2); + assert_eq!(merge_key, &key_abc_2); let merge_key = SortKey::try_merge_key(&key_abc_2, &key_ac_2).unwrap(); - assert_eq!(merge_key, key_abc_2); + assert_eq!(merge_key, &key_abc_2); let merge_key = SortKey::try_merge_key(&key_abc_2, &key_bc_2).unwrap(); - assert_eq!(merge_key, key_abc_2); + assert_eq!(merge_key, &key_abc_2); // super key of (a, b, c) and any of (b, a, c) } is None let merge_key = SortKey::try_merge_key(&key_abc_2, &key_bac_2); assert_eq!(merge_key, None); } - - #[test] - fn test_selected_sort_key() { - let mut sort_key = SortKey::with_capacity(4); - sort_key.with_col("a"); // default sort option - sort_key.with_col_opts("b", true, false); - sort_key.with_col_opts("c", false, false); - sort_key.with_col(TIME_COLUMN_NAME); - - // input cols is empty -> nothing selected - let cols = vec![]; - let selected_key = sort_key.selected_sort_key(cols); - assert!(selected_key.is_empty()); - - // input cols is not part of the key -> nothing selected - let cols = vec!["d", "e"]; - let selected_key = sort_key.selected_sort_key(cols); - assert!(selected_key.is_empty()); - - // input cols exactly the same and in the same order -> exact sort_key selected - let cols = vec!["a", "b", "c", TIME_COLUMN_NAME]; - let selected_key = sort_key.selected_sort_key(cols); - assert_eq!(selected_key, sort_key); - - // input cols exactly the same but in different order -> exact sort_key selected - let cols = vec!["c", TIME_COLUMN_NAME, "b", "a"]; - let selected_key = sort_key.selected_sort_key(cols); - assert_eq!(selected_key, sort_key); - - // input cols is subset but in the same order -> subset selected - let cols = vec!["a", "b"]; - let selected_key = sort_key.selected_sort_key(cols); - let mut expected_key = SortKey::with_capacity(2); - expected_key.with_col("a"); // default sort option - expected_key.with_col_opts("b", true, false); - assert_eq!(selected_key, expected_key); - - // input cols is subset but in the same order -> subset selected - let cols = vec![TIME_COLUMN_NAME]; - let selected_key = sort_key.selected_sort_key(cols); - let mut expected_key = SortKey::with_capacity(1); - expected_key.with_col(TIME_COLUMN_NAME); - assert_eq!(selected_key, expected_key); - - // input cols is subset but in the same order with gap -> subset selected - let cols = vec!["a", "c", TIME_COLUMN_NAME]; - let selected_key = sort_key.selected_sort_key(cols); - let mut expected_key = SortKey::with_capacity(3); - expected_key.with_col("a"); // default sort option - expected_key.with_col_opts("c", false, false); - expected_key.with_col(TIME_COLUMN_NAME); - assert_eq!(selected_key, expected_key); - - // input cols is subset but in different order -> subset in the order with sort_key selected - let cols = vec![TIME_COLUMN_NAME, "b", "c"]; - let selected_key = sort_key.selected_sort_key(cols); - let mut expected_key = SortKey::with_capacity(3); - expected_key.with_col_opts("b", true, false); - expected_key.with_col_opts("c", false, false); - expected_key.with_col(TIME_COLUMN_NAME); - assert_eq!(selected_key, expected_key); - } }