diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index af595d262e..d3aef40c41 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -21,15 +21,14 @@ use object_store::DynObjectStore; use observability_deps::tracing::{debug, info, warn}; use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData}; use query::{ - exec::{Executor, ExecutorType}, - frontend::reorg::ReorgPlanner, + compute_sort_key_for_chunks, exec::ExecutorType, frontend::reorg::ReorgPlanner, util::compute_timenanosecond_min_max, - QueryChunk, }; -use schema::sort::SortKey; -use snafu::{ensure, OptionExt, ResultExt, Snafu}; +use query::{exec::Executor, QueryChunk}; +use snafu::{ensure, ResultExt, Snafu}; +use std::cmp::Ordering; use std::{ - cmp::{max, min, Ordering}, + cmp::{max, min}, collections::{BTreeMap, HashSet}, ops::DerefMut, sync::Arc, @@ -160,14 +159,6 @@ pub enum Error { #[snafu(display("Error joining compaction tasks: {}", source))] CompactionJoin { source: tokio::task::JoinError }, - - #[snafu(display("Error querying partition {}", source))] - QueryingPartition { - source: iox_catalog::interface::Error, - }, - - #[snafu(display("Could not find partition {:?}", partition_id))] - PartitionNotFound { partition_id: PartitionId }, } /// A specialized `Error` for Compactor Data errors @@ -365,18 +356,6 @@ impl Compactor { Ok(candidates) } - /// Fetch the sort key for the partition stored in the catalog, if any. - async fn sort_key_from_catalog(&self, partition_id: PartitionId) -> Result> { - let mut repos = self.catalog.repositories().await; - let partition = repos - .partitions() - .get_by_id(partition_id) - .await - .context(QueryingPartitionSnafu)? - .context(PartitionNotFoundSnafu { partition_id })?; - Ok(partition.sort_key()) - } - /// Runs compaction in a partition resolving any tombstones and compacting data so that parquet /// files will be non-overlapping in time. pub async fn compact_partition( @@ -398,15 +377,6 @@ impl Compactor { if parquet_files.is_empty() { return Ok(()); } - - let sort_key_from_catalog = self - .sort_key_from_catalog(partition_id) - .await? - // This can happen for data in catalogs created in "the before times" - // we do not currently plan to provide an upgrade path (instead we will wipe - // old catalogs) - .expect("Partition sort key should have been available in the catalog"); - let sequencer_id = parquet_files[0].sequencer_id; let file_count = parquet_files.len(); @@ -442,9 +412,7 @@ impl Compactor { info!("compacting group of files: {:?}", original_parquet_file_ids); // compact - let split_compacted_files = self - .compact(group.parquet_files, sort_key_from_catalog.clone()) - .await?; + let split_compacted_files = self.compact(group.parquet_files).await?; debug!("compacted files"); let mut catalog_update_info = Vec::with_capacity(split_compacted_files.len()); @@ -568,7 +536,6 @@ impl Compactor { async fn compact( &self, overlapped_files: Vec, - sort_key_from_catalog: SortKey, ) -> Result> { debug!("compact {} overlapped files", overlapped_files.len()); @@ -650,7 +617,9 @@ impl Compactor { .collect(); let merged_schema = QueryableParquetChunk::merge_schemas(&query_chunks); - let sort_key = sort_key_from_catalog.filter_to(&merged_schema.primary_key()); + // Compute the sorted output of the compacting result + let sort_key = compute_sort_key_for_chunks(&merged_schema, &query_chunks); + debug!("sort key: {:?}", sort_key); // Identify split time let split_time = self.compute_split_time(min_time, max_time); @@ -735,7 +704,7 @@ impl Compactor { max_sequence_number, row_count, compaction_level: 1, // compacted result file always have level 1 - sort_key: Some(sort_key.clone()), + sort_key: None, // todo after #3968 - sort_key must have values }; let compacted_data = CompactedData::new(output_batches, meta, tombstone_map.clone()); @@ -1467,11 +1436,9 @@ mod tests { Arc::new(metric::Registry::new()), ); - let sort_key = SortKey::from_columns(["tag1", "time"]); - // ------------------------------------------------ // no files provided - let result = compactor.compact(vec![], sort_key.clone()).await.unwrap(); + let result = compactor.compact(vec![]).await.unwrap(); assert!(result.is_empty()); // ------------------------------------------------ @@ -1481,10 +1448,7 @@ mod tests { tombstones: vec![], }; // Nothing compacted for one file without tombstones - let result = compactor - .compact(vec![pf.clone()], sort_key.clone()) - .await - .unwrap(); + let result = compactor.compact(vec![pf.clone()]).await.unwrap(); assert!(result.is_empty()); // ------------------------------------------------ @@ -1496,7 +1460,7 @@ mod tests { pf.add_tombstones(vec![tombstone.tombstone.clone()]); // should have compacted data - let batches = compactor.compact(vec![pf], sort_key.clone()).await.unwrap(); + let batches = compactor.compact(vec![pf]).await.unwrap(); // 2 sets based on the split rule assert_eq!(batches.len(), 2); // Data: row tag1=VT was removed @@ -1571,8 +1535,6 @@ mod tests { Arc::new(metric::Registry::new()), ); - let sort_key = SortKey::from_columns(["tag1", "time"]); - // File 1 with tombstone let tombstone = table .with_sequencer(&sequencer) @@ -1589,10 +1551,7 @@ mod tests { }; // Compact them - let batches = compactor - .compact(vec![pf1, pf2], sort_key.clone()) - .await - .unwrap(); + let batches = compactor.compact(vec![pf1, pf2]).await.unwrap(); // 2 sets based on the split rule assert_eq!(batches.len(), 2); @@ -1683,10 +1642,6 @@ mod tests { Arc::new(metric::Registry::new()), ); - // The sort key comes from the catalog and should be the union of all tags the - // ingester has seen - let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]); - // File 1 with tombstone let tombstone = table .with_sequencer(&sequencer) @@ -1709,10 +1664,7 @@ mod tests { // Compact them let batches = compactor - .compact( - vec![pf1.clone(), pf2.clone(), pf3.clone()], - sort_key.clone(), - ) + .compact(vec![pf1.clone(), pf2.clone(), pf3.clone()]) .await .unwrap(); @@ -1746,10 +1698,6 @@ mod tests { ], &batches[1].data ); - - // Sort keys should be the same as was passed in to compact - assert_eq!(batches[0].meta.sort_key.as_ref().unwrap(), &sort_key); - assert_eq!(batches[1].meta.sort_key.as_ref().unwrap(), &sort_key); } /// A test utility function to make minimially-viable ParquetFile records with particular @@ -2389,7 +2337,7 @@ mod tests { max_sequence_number: SequenceNumber::new(6), row_count: 3, compaction_level: 1, // level of compacted data is always 1 - sort_key: Some(SortKey::from_columns(["tag1", "time"])), + sort_key: None, }; let chunk1 = Arc::new( diff --git a/compactor/src/query.rs b/compactor/src/query.rs index e0cf632b53..478ef5a396 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -101,7 +101,7 @@ impl QueryChunkMeta for QueryableParquetChunk { } fn sort_key(&self) -> Option<&SortKey> { - self.iox_metadata.sort_key.as_ref() + None // TODO: return the sortkey when it is available in the parquet file #3968 } fn delete_predicates(&self) -> &[Arc] { diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index b5dab5f6c2..e369896f1c 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -1,6 +1,9 @@ //! This module is responsible for compacting Ingester's data -use crate::data::{PersistingBatch, QueryableBatch}; +use crate::{ + data::{PersistingBatch, QueryableBatch}, + sort_key::{adjust_sort_key_columns, compute_sort_key}, +}; use arrow::record_batch::RecordBatch; use data_types2::{NamespaceId, PartitionInfo}; use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; @@ -12,7 +15,7 @@ use query::{ util::compute_timenanosecond_min_max, QueryChunk, QueryChunkMeta, }; -use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey}; +use schema::sort::SortKey; use snafu::{ResultExt, Snafu}; use std::sync::Arc; use time::{Time, TimeProvider}; @@ -82,10 +85,7 @@ pub async fn compact_persisting_batch( adjust_sort_key_columns(&sk, &batch.data.schema().primary_key()) } None => { - let sort_key = compute_sort_key( - batch.data.schema().as_ref(), - batch.data.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = compute_sort_key(&batch.data); // Use the sort key computed from the cardinality as the sort key for this parquet // file's metadata, also return the sort key to be stored in the catalog (sort_key.clone(), Some(sort_key)) @@ -756,10 +756,7 @@ mod tests { let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = compute_sort_key(&compact_batch); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -800,10 +797,7 @@ mod tests { let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = compute_sort_key(&compact_batch); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -844,10 +838,7 @@ mod tests { let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = compute_sort_key(&compact_batch); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -893,10 +884,7 @@ mod tests { let expected_pk = vec!["tag1", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = compute_sort_key(&compact_batch); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -939,10 +927,7 @@ mod tests { let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = compute_sort_key(&compact_batch); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact @@ -999,10 +984,7 @@ mod tests { let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = compute_sort_key(&compact_batch); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact @@ -1067,10 +1049,7 @@ mod tests { let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = compute_sort_key(&compact_batch); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact @@ -1128,10 +1107,7 @@ mod tests { let expected_pk = vec!["tag1", "tag2", "time"]; assert_eq!(expected_pk, pk); - let sort_key = compute_sort_key( - &schema, - compact_batch.data.iter().map(|sb| sb.data.as_ref()), - ); + let sort_key = compute_sort_key(&compact_batch); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index d839525911..0abc82104a 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -24,6 +24,7 @@ mod poison; pub mod querier_handler; pub mod query; pub mod server; +pub mod sort_key; pub mod stream_handler; #[cfg(test)] diff --git a/ingester/src/sort_key.rs b/ingester/src/sort_key.rs new file mode 100644 index 0000000000..07e46a1204 --- /dev/null +++ b/ingester/src/sort_key.rs @@ -0,0 +1,324 @@ +//! Functions for computing a sort key based on cardinality of primary key columns. + +use crate::data::{QueryableBatch, SnapshotBatch}; +use arrow::{ + array::{Array, DictionaryArray, StringArray}, + datatypes::{DataType, Int32Type}, + record_batch::RecordBatch, +}; +use observability_deps::tracing::trace; +use query::QueryChunkMeta; +use schema::{ + sort::{SortKey, SortKeyBuilder}, + TIME_COLUMN_NAME, +}; +use std::{ + collections::{HashMap, HashSet}, + num::NonZeroU64, + sync::Arc, +}; + +/// Given a `QueryableBatch`, compute a sort key based on: +/// +/// - The columns that make up the primary key of the schema of this batch +/// - Order those columns from low cardinality to high cardinality based on the data +/// - Always have the time column last +pub fn compute_sort_key(queryable_batch: &QueryableBatch) -> SortKey { + let schema = queryable_batch.schema(); + let primary_key = schema.primary_key(); + + let cardinalities = distinct_counts(&queryable_batch.data, &primary_key); + + trace!(cardinalities=?cardinalities, "cardinalities of of columns to compute sort key"); + + let mut cardinalities: Vec<_> = cardinalities.into_iter().collect(); + // Sort by (cardinality, column_name) to have deterministic order if same cardinality + cardinalities.sort_by_cached_key(|x| (x.1, x.0.clone())); + + let mut builder = SortKeyBuilder::with_capacity(cardinalities.len() + 1); + for (col, _) in cardinalities { + builder = builder.with_col(col) + } + builder = builder.with_col(TIME_COLUMN_NAME); + + let key = builder.build(); + + trace!(computed_sort_key=?key, "Value of sort key from compute_sort_key"); + + key +} + +/// Takes batches of data and the columns that make up the primary key. Computes the number of +/// distinct values for each primary key column across all batches, also known as "cardinality". +/// Used to determine sort order. +fn distinct_counts( + batches: &[Arc], + primary_key: &[&str], +) -> HashMap { + let mut distinct_values_across_batches = HashMap::with_capacity(primary_key.len()); + + for batch in batches { + for (column, distinct_values) in distinct_values(&batch.data, primary_key) { + let set = distinct_values_across_batches + .entry(column) + .or_insert_with(HashSet::new); + set.extend(distinct_values.into_iter()); + } + } + + distinct_values_across_batches + .into_iter() + .filter_map(|(column, distinct_values)| { + distinct_values + .len() + .try_into() + .ok() + .and_then(NonZeroU64::new) + .map(|count| (column, count)) + }) + .collect() +} + +/// Takes a `RecordBatch` and the column names that make up the primary key of the schema. Returns +/// a map of column names to the set of the distinct string values, for the specified columns. Used +/// to compute cardinality across multiple `RecordBatch`es. +fn distinct_values(batch: &RecordBatch, primary_key: &[&str]) -> HashMap> { + let schema = batch.schema(); + batch + .columns() + .iter() + .zip(schema.fields()) + .filter(|(_col, field)| primary_key.contains(&field.name().as_str())) + .flat_map(|(col, field)| match field.data_type() { + // Dictionaries of I32 => Utf8 are supported as tags in + // `schema::InfluxColumnType::valid_arrow_type` + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + let col = col + .as_any() + .downcast_ref::>() + .expect("unexpected datatype"); + + let values = col.values(); + let values = values + .as_any() + .downcast_ref::() + .expect("unexpected datatype"); + + Some(( + field.name().into(), + values.iter().flatten().map(ToString::to_string).collect(), + )) + } + // Utf8 types are supported as tags + DataType::Utf8 => { + let values = col + .as_any() + .downcast_ref::() + .expect("unexpected datatype"); + + Some(( + field.name().into(), + values.iter().flatten().map(ToString::to_string).collect(), + )) + } + // No other data types are supported as tags; don't compute distinct values for them + _ => None, + }) + .collect() +} + +/// Given a sort key from the catalog and the primary key (tags + time) from the data, return the +/// sort key that should be used for this parquet file and, if needed, the sort key that should +/// be updated in the catalog. These are computed as follows: +/// +/// - Columns that appear in both the primary key and the catalog sort key should appear in the +/// same order as they appear in the catalog sort key. +/// - If there are new columns that appear in the primary key, add the new columns to the end of +/// the catalog sort key's tag list. Also return an updated catalog sort key to save the new +/// column in the catalog. +/// - If there are columns that appear in the catalog sort key but aren't present in this data's +/// primary key, don't include them in the sort key to be used for this data. Don't remove them +/// from the catalog sort key. +pub fn adjust_sort_key_columns( + catalog_sort_key: &SortKey, + primary_key: &[&str], +) -> (SortKey, Option) { + let existing_columns_without_time = catalog_sort_key + .iter() + .map(|(col, _opts)| col) + .cloned() + .filter(|col| TIME_COLUMN_NAME != col.as_ref()); + let new_columns: Vec<_> = primary_key + .iter() + .filter(|col| !catalog_sort_key.contains(col)) + .collect(); + + let metadata_sort_key = SortKey::from_columns( + existing_columns_without_time + .clone() + .filter(|col| primary_key.contains(&col.as_ref())) + .chain(new_columns.iter().map(|&&col| Arc::from(col))) + .chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))), + ); + + let catalog_update = if new_columns.is_empty() { + None + } else { + Some(SortKey::from_columns( + existing_columns_without_time + .chain(new_columns.into_iter().map(|&col| Arc::from(col))) + .chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))), + )) + }; + + (metadata_sort_key, catalog_update) +} + +#[cfg(test)] +mod tests { + use super::*; + use data_types2::SequenceNumber; + use schema::selection::Selection; + + fn lp_to_queryable_batch(line_protocol_batches: &[&str]) -> QueryableBatch { + let data = line_protocol_batches + .iter() + .map(|line_protocol| { + let (_, mb) = mutable_batch_lp::test_helpers::lp_to_mutable_batch(line_protocol); + let rb = mb.to_arrow(Selection::All).unwrap(); + + Arc::new(SnapshotBatch { + min_sequencer_number: SequenceNumber::new(0), + max_sequencer_number: SequenceNumber::new(1), + data: Arc::new(rb), + }) + }) + .collect(); + + QueryableBatch { + data, + delete_predicates: Default::default(), + table_name: Default::default(), + } + } + + #[test] + fn test_distinct_values() { + let lp = r#" + cpu,host=a val=23 1 + cpu,host=b,env=prod val=2 1 + cpu,host=c,env=stage val=11 1 + cpu,host=a,env=prod val=14 2 + "#; + let qb = lp_to_queryable_batch(&[lp]); + let rb = &qb.data[0].data; + + // Pass the tag field names plus time as the primary key, this is what should happen + let distinct = distinct_values(rb, &["host", "env", "time"]); + + // The hashmap should contain the distinct values for "host" and "env" only + assert_eq!(distinct.len(), 2); + + // Return unique values + assert_eq!( + *distinct.get("host").unwrap(), + HashSet::from(["a".into(), "b".into(), "c".into()]), + ); + // TODO: do nulls count as a value? + assert_eq!( + *distinct.get("env").unwrap(), + HashSet::from(["prod".into(), "stage".into()]), + ); + + // Requesting a column not present returns None + assert_eq!(distinct.get("foo"), None); + + // Distinct count isn't computed for the time column or fields + assert_eq!(distinct.get("time"), None); + assert_eq!(distinct.get("val"), None); + + // Specify a column in the primary key that doesn't appear in the data + let distinct = distinct_values(rb, &["host", "env", "foo", "time"]); + // The hashmap should contain the distinct values for "host" and "env" only + assert_eq!(distinct.len(), 2); + + // Don't specify one of the tag columns for the primary key + let distinct = distinct_values(rb, &["host", "foo", "time"]); + // The hashmap should contain the distinct values for the specified columns only + assert_eq!(distinct.len(), 1); + } + + #[test] + fn test_sort_key() { + // Across these three record batches: + // - `host` has 2 distinct values: "a", "b" + // - 'env' has 3 distinct values: "prod", "stage", "dev" + // host's 2 values appear in each record batch, so the distinct counts could be incorrectly + // aggregated together as 2 + 2 + 2 = 6. env's 3 values each occur in their own record + // batch, so they should always be aggregated as 3. + // host has the lower cardinality, so it should appear first in the sort key. + let lp1 = r#" + cpu,host=a,env=prod val=23 1 + cpu,host=b,env=prod val=2 2 + "#; + let lp2 = r#" + cpu,host=a,env=stage val=23 3 + cpu,host=b,env=stage val=2 4 + "#; + let lp3 = r#" + cpu,host=a,env=dev val=23 5 + cpu,host=b,env=dev val=2 6 + "#; + let qb = lp_to_queryable_batch(&[lp1, lp2, lp3]); + + let sort_key = compute_sort_key(&qb); + + assert_eq!(sort_key, SortKey::from_columns(["host", "env", "time"])); + } + + #[test] + fn test_adjust_sort_key_columns() { + // If the catalog sort key is the same as the primary key, no changes + let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); + let data_primary_key = ["host", "env", "time"]; + + let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key); + + assert_eq!(metadata, catalog_sort_key); + assert!(update.is_none()); + + // If the catalog sort key contains more columns than the primary key, the metadata key + // should only contain the columns in the data and the catalog should not be updated + let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); + let data_primary_key = ["host", "time"]; + + let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key); + + assert_eq!(metadata, SortKey::from_columns(data_primary_key)); + assert!(update.is_none()); + + // If the catalog sort key contains fewer columns than the primary key, add the new columns + // just before the time column and update the catalog + let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); + let data_primary_key = ["host", "temp", "env", "time"]; + + let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key); + + let expected = SortKey::from_columns(["host", "env", "temp", "time"]); + assert_eq!(metadata, expected); + assert_eq!(update.unwrap(), expected); + + // If the sort key contains a column that doesn't exist in the data and is missing a column, + // the metadata key should only contain the columns in the data and the catalog should be + // updated to include the new column (but not remove the missing column) + let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); + let data_primary_key = ["host", "temp", "time"]; + + let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key); + assert_eq!(metadata, SortKey::from_columns(data_primary_key)); + let expected = SortKey::from_columns(["host", "env", "temp", "time"]); + assert_eq!(update.unwrap(), expected); + } +} diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 4ee7a0de72..6f72bd3b8c 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -7,11 +7,11 @@ use arrow::{ use bytes::Bytes; use data_types2::{ Column, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileParams, - Partition, PartitionId, QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, - Timestamp, Tombstone, TombstoneId, + Partition, QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, + Tombstone, TombstoneId, }; use iox_catalog::{ - interface::{Catalog, PartitionRepo, INITIAL_COMPACTION_LEVEL}, + interface::{Catalog, INITIAL_COMPACTION_LEVEL}, mem::MemCatalog, }; use iox_object_store::{IoxObjectStore, ParquetFilePath}; @@ -22,7 +22,7 @@ use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData}; use query::exec::Executor; use schema::{ selection::Selection, - sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder}, + sort::{SortKey, SortKeyBuilder}, }; use std::sync::Arc; use time::{MockProvider, Time, TimeProvider}; @@ -435,7 +435,7 @@ impl TestPartition { max_sequence_number, row_count: row_count as i64, compaction_level: INITIAL_COMPACTION_LEVEL, - sort_key: Some(sort_key.clone()), + sort_key: Some(sort_key), }; let (parquet_metadata_bin, real_file_size_bytes) = create_parquet_file(&self.catalog.object_store, &metadata, record_batch).await; @@ -462,8 +462,6 @@ impl TestPartition { .await .unwrap(); - update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await; - Arc::new(TestParquetFile { catalog: Arc::clone(&self.catalog), namespace: Arc::clone(&self.namespace), @@ -509,7 +507,7 @@ impl TestPartition { max_sequence_number, row_count: row_count as i64, compaction_level: INITIAL_COMPACTION_LEVEL, - sort_key: Some(sort_key.clone()), + sort_key: Some(sort_key), }; let (parquet_metadata_bin, _real_file_size_bytes) = create_parquet_file(&self.catalog.object_store, &metadata, record_batch).await; @@ -536,8 +534,6 @@ impl TestPartition { .await .unwrap(); - update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await; - Arc::new(TestParquetFile { catalog: Arc::clone(&self.catalog), namespace: Arc::clone(&self.namespace), @@ -546,49 +542,6 @@ impl TestPartition { } } -async fn update_catalog_sort_key_if_needed( - partitions_catalog: &mut dyn PartitionRepo, - partition_id: PartitionId, - sort_key: SortKey, -) { - // Fetch the latest partition info from the catalog - let partition = partitions_catalog - .get_by_id(partition_id) - .await - .unwrap() - .unwrap(); - - // Similarly to what the ingester does, if there's an existing sort key in the catalog, add new - // columns onto the end - match partition.sort_key() { - Some(catalog_sort_key) => { - let sort_key_string = sort_key.to_columns(); - let new_sort_key: Vec<_> = sort_key_string.split(',').collect(); - let (_metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &new_sort_key); - if let Some(new_sort_key) = update { - let new_columns = new_sort_key.to_columns(); - dbg!( - "Updating sort key from {:?} to {:?}", - catalog_sort_key.to_columns(), - &new_columns, - ); - partitions_catalog - .update_sort_key(partition_id, &new_columns) - .await - .unwrap(); - } - } - None => { - let new_columns = sort_key.to_columns(); - dbg!("Updating sort key from None to {:?}", &new_columns,); - partitions_catalog - .update_sort_key(partition_id, &new_columns) - .await - .unwrap(); - } - } -} - /// Create parquet file and return thrift-encoded and zstd-compressed parquet metadata as well as the file size. async fn create_parquet_file( object_store: &Arc, diff --git a/query/src/provider.rs b/query/src/provider.rs index 7caef32a4a..fea23c39ee 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -442,9 +442,7 @@ impl Deduplicater { // the primary key, in order for deduplication to work correctly assert!( pk_schema.len() <= sort_key.len(), - "output_sort_key ({:?}) must be at least as long as the primary key ({:?})", - sort_key.to_columns(), - pk_schema, + "output_sort_key must be at least as long as the primary key" ); assert!( pk_schema.is_sorted_on_pk(sort_key), diff --git a/schema/src/sort.rs b/schema/src/sort.rs index 098c8e4d62..93c325540e 100644 --- a/schema/src/sort.rs +++ b/schema/src/sort.rs @@ -1,20 +1,12 @@ -use crate::{Schema, TIME_COLUMN_NAME}; +use std::sync::Arc; +use std::{fmt::Display, str::FromStr}; + use arrow::compute::SortOptions; -use arrow::{ - array::{Array, DictionaryArray, StringArray}, - datatypes::{DataType, Int32Type}, - record_batch::RecordBatch, -}; use indexmap::{map::Iter, IndexMap}; use itertools::Itertools; use snafu::Snafu; -use std::{ - collections::{HashMap, HashSet}, - fmt::Display, - num::NonZeroU64, - str::FromStr, - sync::Arc, -}; + +use super::TIME_COLUMN_NAME; #[derive(Debug, Snafu)] pub enum Error { @@ -203,32 +195,6 @@ impl SortKey { self.columns.is_empty() } - /// Filters this sort key to contain only the columns present in the primary key, in the order - /// that the columns appear in this sort key. - /// - /// # Panics - /// - /// Panics if any columns in the primary key are NOT present in this sort key. - pub fn filter_to(&self, primary_key: &[&str]) -> SortKey { - let missing_from_catalog_key: Vec<_> = primary_key - .iter() - .filter(|col| !self.contains(col)) - .collect(); - if !missing_from_catalog_key.is_empty() { - panic!( - "Primary key column(s) found that don't appear in the catalog sort key: [{:?}]", - missing_from_catalog_key - ) - } - - Self::from_columns( - self.iter() - .map(|(col, _opts)| col) - .filter(|col| primary_key.contains(&col.as_ref())) - .cloned(), - ) - } - /// Returns merge key of the 2 given keys if one covers the other. Returns None otherwise. /// Key1 is said to cover key2 if key2 is a subset and in the same order of key1. /// Examples: @@ -322,164 +288,9 @@ impl Display for SortKey { } } -/// Given a `Schema` and an iterator of `RecordBatch`es, compute a sort key based on: -/// -/// - The columns that make up the primary key of the schema -/// - Order those columns from low cardinality to high cardinality based on the data -/// - Always have the time column last -pub fn compute_sort_key<'a>( - schema: &Schema, - batches: impl Iterator, -) -> SortKey { - let primary_key = schema.primary_key(); - - let cardinalities = distinct_counts(batches, &primary_key); - - let mut cardinalities: Vec<_> = cardinalities.into_iter().collect(); - // Sort by (cardinality, column_name) to have deterministic order if same cardinality - cardinalities.sort_by_cached_key(|x| (x.1, x.0.clone())); - - let mut builder = SortKeyBuilder::with_capacity(cardinalities.len() + 1); - for (col, _) in cardinalities { - builder = builder.with_col(col) - } - builder = builder.with_col(TIME_COLUMN_NAME); - builder.build() -} - -/// Takes batches of data and the columns that make up the primary key. Computes the number of -/// distinct values for each primary key column across all batches, also known as "cardinality". -/// Used to determine sort order. -fn distinct_counts<'a>( - batches: impl Iterator, - primary_key: &[&str], -) -> HashMap { - let mut distinct_values_across_batches = HashMap::with_capacity(primary_key.len()); - - for batch in batches { - for (column, distinct_values) in distinct_values(batch, primary_key) { - let set = distinct_values_across_batches - .entry(column) - .or_insert_with(HashSet::new); - set.extend(distinct_values.into_iter()); - } - } - - distinct_values_across_batches - .into_iter() - .filter_map(|(column, distinct_values)| { - distinct_values - .len() - .try_into() - .ok() - .and_then(NonZeroU64::new) - .map(|count| (column, count)) - }) - .collect() -} - -/// Takes a `RecordBatch` and the column names that make up the primary key of the schema. Returns -/// a map of column names to the set of the distinct string values, for the specified columns. Used -/// to compute cardinality across multiple `RecordBatch`es. -fn distinct_values(batch: &RecordBatch, primary_key: &[&str]) -> HashMap> { - let schema = batch.schema(); - batch - .columns() - .iter() - .zip(schema.fields()) - .filter(|(_col, field)| primary_key.contains(&field.name().as_str())) - .flat_map(|(col, field)| match field.data_type() { - // Dictionaries of I32 => Utf8 are supported as tags in - // `schema::InfluxColumnType::valid_arrow_type` - DataType::Dictionary(key, value) - if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => - { - let col = col - .as_any() - .downcast_ref::>() - .expect("unexpected datatype"); - - let values = col.values(); - let values = values - .as_any() - .downcast_ref::() - .expect("unexpected datatype"); - - Some(( - field.name().into(), - values.iter().flatten().map(ToString::to_string).collect(), - )) - } - // Utf8 types are supported as tags - DataType::Utf8 => { - let values = col - .as_any() - .downcast_ref::() - .expect("unexpected datatype"); - - Some(( - field.name().into(), - values.iter().flatten().map(ToString::to_string).collect(), - )) - } - // No other data types are supported as tags; don't compute distinct values for them - _ => None, - }) - .collect() -} - -/// Given a sort key from the catalog and the primary key (tags + time) from the data, return the -/// sort key that should be used for this parquet file and, if needed, the sort key that should -/// be updated in the catalog. These are computed as follows: -/// -/// - Columns that appear in both the primary key and the catalog sort key should appear in the -/// same order as they appear in the catalog sort key. -/// - If there are new columns that appear in the primary key, add the new columns to the end of -/// the catalog sort key's tag list. Also return an updated catalog sort key to save the new -/// column in the catalog. -/// - If there are columns that appear in the catalog sort key but aren't present in this data's -/// primary key, don't include them in the sort key to be used for this data. Don't remove them -/// from the catalog sort key. -pub fn adjust_sort_key_columns( - catalog_sort_key: &SortKey, - primary_key: &[&str], -) -> (SortKey, Option) { - let existing_columns_without_time = catalog_sort_key - .iter() - .map(|(col, _opts)| col) - .cloned() - .filter(|col| TIME_COLUMN_NAME != col.as_ref()); - let new_columns: Vec<_> = primary_key - .iter() - .filter(|col| !catalog_sort_key.contains(col)) - .collect(); - - let metadata_sort_key = SortKey::from_columns( - existing_columns_without_time - .clone() - .filter(|col| primary_key.contains(&col.as_ref())) - .chain(new_columns.iter().map(|&&col| Arc::from(col))) - .chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))), - ); - - let catalog_update = if new_columns.is_empty() { - None - } else { - Some(SortKey::from_columns( - existing_columns_without_time - .chain(new_columns.into_iter().map(|&col| Arc::from(col))) - .chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))), - )) - }; - - (metadata_sort_key, catalog_update) -} - #[cfg(test)] mod tests { use super::*; - use crate::builder::SchemaBuilder; - use arrow::array::ArrayRef; #[test] fn test_parse() { @@ -697,182 +508,4 @@ mod tests { let merge_key = SortKey::try_merge_key(&key_abc_2, &key_bac_2); assert_eq!(merge_key, None); } - - fn to_string_array(values: impl Into) -> ArrayRef { - Arc::new(values.into()) as ArrayRef - } - - #[test] - fn test_distinct_values() { - let rb = RecordBatch::try_from_iter(vec![ - ("host", to_string_array(vec!["a", "b", "c", "a"])), - ( - "env", - to_string_array(vec![None, Some("prod"), Some("stage"), Some("prod")]), - ), - ]) - .unwrap(); - - // Pass the tag field names plus time as the primary key, this is what should happen - let distinct = distinct_values(&rb, &["host", "env", "time"]); - - // The hashmap should contain the distinct values for "host" and "env" only - assert_eq!(distinct.len(), 2); - - // Return unique values - assert_eq!( - *distinct.get("host").unwrap(), - HashSet::from(["a".into(), "b".into(), "c".into()]), - ); - // TODO: do nulls count as a value? - assert_eq!( - *distinct.get("env").unwrap(), - HashSet::from(["prod".into(), "stage".into()]), - ); - - // Requesting a column not present returns None - assert_eq!(distinct.get("foo"), None); - - // Distinct count isn't computed for the time column or fields - assert_eq!(distinct.get("time"), None); - assert_eq!(distinct.get("val"), None); - - // Specify a column in the primary key that doesn't appear in the data - let distinct = distinct_values(&rb, &["host", "env", "foo", "time"]); - // The hashmap should contain the distinct values for "host" and "env" only - assert_eq!(distinct.len(), 2); - - // Don't specify one of the tag columns for the primary key - let distinct = distinct_values(&rb, &["host", "foo", "time"]); - // The hashmap should contain the distinct values for the specified columns only - assert_eq!(distinct.len(), 1); - } - - #[test] - fn test_sort_key() { - // Across these three record batches: - // - `host` has 2 distinct values: "a", "b" - // - 'env' has 3 distinct values: "prod", "stage", "dev" - // host's 2 values appear in each record batch, so the distinct counts could be incorrectly - // aggregated together as 2 + 2 + 2 = 6. env's 3 values each occur in their own record - // batch, so they should always be aggregated as 3. - // host has the lower cardinality, so it should appear first in the sort key. - let rb1 = Arc::new( - RecordBatch::try_from_iter(vec![ - ("host", to_string_array(vec!["a", "b"])), - ("env", to_string_array(vec!["prod", "prod"])), - ]) - .unwrap(), - ); - let rb2 = Arc::new( - RecordBatch::try_from_iter(vec![ - ("host", to_string_array(vec!["a", "b"])), - ("env", to_string_array(vec!["stage", "stage"])), - ]) - .unwrap(), - ); - let rb3 = Arc::new( - RecordBatch::try_from_iter(vec![ - ("host", to_string_array(vec!["a", "b"])), - ("env", to_string_array(vec!["dev", "dev"])), - ]) - .unwrap(), - ); - let rbs = [rb1, rb2, rb3]; - let schema = SchemaBuilder::new() - .tag("host") - .tag("env") - .timestamp() - .build() - .unwrap(); - - let sort_key = compute_sort_key(&schema, rbs.iter().map(|rb| rb.as_ref())); - - assert_eq!(sort_key, SortKey::from_columns(["host", "env", "time"])); - } - - #[test] - fn test_adjust_sort_key_columns() { - // If the catalog sort key is the same as the primary key, no changes - let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); - let data_primary_key = ["host", "env", "time"]; - - let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key); - - assert_eq!(metadata, catalog_sort_key); - assert!(update.is_none()); - - // If the catalog sort key contains more columns than the primary key, the metadata key - // should only contain the columns in the data and the catalog should not be updated - let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); - let data_primary_key = ["host", "time"]; - - let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key); - - assert_eq!(metadata, SortKey::from_columns(data_primary_key)); - assert!(update.is_none()); - - // If the catalog sort key contains fewer columns than the primary key, add the new columns - // just before the time column and update the catalog - let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); - let data_primary_key = ["host", "temp", "env", "time"]; - - let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key); - - let expected = SortKey::from_columns(["host", "env", "temp", "time"]); - assert_eq!(metadata, expected); - assert_eq!(update.unwrap(), expected); - - // If the sort key contains a column that doesn't exist in the data and is missing a column, - // the metadata key should only contain the columns in the data and the catalog should be - // updated to include the new column (but not remove the missing column) - let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); - let data_primary_key = ["host", "temp", "time"]; - - let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key); - assert_eq!(metadata, SortKey::from_columns(data_primary_key)); - let expected = SortKey::from_columns(["host", "env", "temp", "time"]); - assert_eq!(update.unwrap(), expected); - } - - #[test] - fn test_filter_to_primary_key() { - // If the catalog sort key is the same as the primary key, no changes - let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); - let data_primary_key = ["host", "env", "time"]; - - let filtered = catalog_sort_key.filter_to(&data_primary_key); - assert_eq!(catalog_sort_key, filtered); - - // If the catalog sort key contains more columns than the primary key, the filtered key - // should only contain the columns in the primary key - let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); - let data_primary_key = ["host", "time"]; - - let filtered = catalog_sort_key.filter_to(&data_primary_key); - let expected = SortKey::from_columns(["host", "time"]); - assert_eq!(expected, filtered); - - // If the catalog sort key has columns in a different order than the primary key, the - // filtered key should contain the columns in the same order as the catalog sort key. - let catalog_sort_key = SortKey::from_columns(["host", "env", "zone", "time"]); - let data_primary_key = ["env", "host", "time"]; - - let filtered = catalog_sort_key.filter_to(&data_primary_key); - let expected = SortKey::from_columns(["host", "env", "time"]); - assert_eq!(expected, filtered); - } - - #[test] - #[should_panic] - fn test_filter_missing_columns() { - // If the primary key has columns that are missing from the catalog sort key, panic. - // This should never happen because the ingester should save the sort key as the union - // of all primary key columns it has seen, and the compactor shouldn't get data that hasn't - // been through the ingester. - let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]); - let data_primary_key = ["host", "env", "zone", "time"]; - - catalog_sort_key.filter_to(&data_primary_key); - } }