fix: Revert "feat: Use the sort key stored in the catalog during compaction" (#4299)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Carol (Nichols || Goulding) 2022-04-13 10:11:10 -04:00 committed by GitHub
parent 12800bd07e
commit 7ddbf7c025
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 368 additions and 535 deletions

View File

@ -21,15 +21,14 @@ use object_store::DynObjectStore;
use observability_deps::tracing::{debug, info, warn};
use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData};
use query::{
exec::{Executor, ExecutorType},
frontend::reorg::ReorgPlanner,
compute_sort_key_for_chunks, exec::ExecutorType, frontend::reorg::ReorgPlanner,
util::compute_timenanosecond_min_max,
QueryChunk,
};
use schema::sort::SortKey;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use query::{exec::Executor, QueryChunk};
use snafu::{ensure, ResultExt, Snafu};
use std::cmp::Ordering;
use std::{
cmp::{max, min, Ordering},
cmp::{max, min},
collections::{BTreeMap, HashSet},
ops::DerefMut,
sync::Arc,
@ -160,14 +159,6 @@ pub enum Error {
#[snafu(display("Error joining compaction tasks: {}", source))]
CompactionJoin { source: tokio::task::JoinError },
#[snafu(display("Error querying partition {}", source))]
QueryingPartition {
source: iox_catalog::interface::Error,
},
#[snafu(display("Could not find partition {:?}", partition_id))]
PartitionNotFound { partition_id: PartitionId },
}
/// A specialized `Error` for Compactor Data errors
@ -365,18 +356,6 @@ impl Compactor {
Ok(candidates)
}
/// Fetch the sort key for the partition stored in the catalog, if any.
async fn sort_key_from_catalog(&self, partition_id: PartitionId) -> Result<Option<SortKey>> {
let mut repos = self.catalog.repositories().await;
let partition = repos
.partitions()
.get_by_id(partition_id)
.await
.context(QueryingPartitionSnafu)?
.context(PartitionNotFoundSnafu { partition_id })?;
Ok(partition.sort_key())
}
/// Runs compaction in a partition resolving any tombstones and compacting data so that parquet
/// files will be non-overlapping in time.
pub async fn compact_partition(
@ -398,15 +377,6 @@ impl Compactor {
if parquet_files.is_empty() {
return Ok(());
}
let sort_key_from_catalog = self
.sort_key_from_catalog(partition_id)
.await?
// This can happen for data in catalogs created in "the before times"
// we do not currently plan to provide an upgrade path (instead we will wipe
// old catalogs)
.expect("Partition sort key should have been available in the catalog");
let sequencer_id = parquet_files[0].sequencer_id;
let file_count = parquet_files.len();
@ -442,9 +412,7 @@ impl Compactor {
info!("compacting group of files: {:?}", original_parquet_file_ids);
// compact
let split_compacted_files = self
.compact(group.parquet_files, sort_key_from_catalog.clone())
.await?;
let split_compacted_files = self.compact(group.parquet_files).await?;
debug!("compacted files");
let mut catalog_update_info = Vec::with_capacity(split_compacted_files.len());
@ -568,7 +536,6 @@ impl Compactor {
async fn compact(
&self,
overlapped_files: Vec<ParquetFileWithTombstone>,
sort_key_from_catalog: SortKey,
) -> Result<Vec<CompactedData>> {
debug!("compact {} overlapped files", overlapped_files.len());
@ -650,7 +617,9 @@ impl Compactor {
.collect();
let merged_schema = QueryableParquetChunk::merge_schemas(&query_chunks);
let sort_key = sort_key_from_catalog.filter_to(&merged_schema.primary_key());
// Compute the sorted output of the compacting result
let sort_key = compute_sort_key_for_chunks(&merged_schema, &query_chunks);
debug!("sort key: {:?}", sort_key);
// Identify split time
let split_time = self.compute_split_time(min_time, max_time);
@ -735,7 +704,7 @@ impl Compactor {
max_sequence_number,
row_count,
compaction_level: 1, // compacted result file always have level 1
sort_key: Some(sort_key.clone()),
sort_key: None, // todo after #3968 - sort_key must have values
};
let compacted_data = CompactedData::new(output_batches, meta, tombstone_map.clone());
@ -1467,11 +1436,9 @@ mod tests {
Arc::new(metric::Registry::new()),
);
let sort_key = SortKey::from_columns(["tag1", "time"]);
// ------------------------------------------------
// no files provided
let result = compactor.compact(vec![], sort_key.clone()).await.unwrap();
let result = compactor.compact(vec![]).await.unwrap();
assert!(result.is_empty());
// ------------------------------------------------
@ -1481,10 +1448,7 @@ mod tests {
tombstones: vec![],
};
// Nothing compacted for one file without tombstones
let result = compactor
.compact(vec![pf.clone()], sort_key.clone())
.await
.unwrap();
let result = compactor.compact(vec![pf.clone()]).await.unwrap();
assert!(result.is_empty());
// ------------------------------------------------
@ -1496,7 +1460,7 @@ mod tests {
pf.add_tombstones(vec![tombstone.tombstone.clone()]);
// should have compacted data
let batches = compactor.compact(vec![pf], sort_key.clone()).await.unwrap();
let batches = compactor.compact(vec![pf]).await.unwrap();
// 2 sets based on the split rule
assert_eq!(batches.len(), 2);
// Data: row tag1=VT was removed
@ -1571,8 +1535,6 @@ mod tests {
Arc::new(metric::Registry::new()),
);
let sort_key = SortKey::from_columns(["tag1", "time"]);
// File 1 with tombstone
let tombstone = table
.with_sequencer(&sequencer)
@ -1589,10 +1551,7 @@ mod tests {
};
// Compact them
let batches = compactor
.compact(vec![pf1, pf2], sort_key.clone())
.await
.unwrap();
let batches = compactor.compact(vec![pf1, pf2]).await.unwrap();
// 2 sets based on the split rule
assert_eq!(batches.len(), 2);
@ -1683,10 +1642,6 @@ mod tests {
Arc::new(metric::Registry::new()),
);
// The sort key comes from the catalog and should be the union of all tags the
// ingester has seen
let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]);
// File 1 with tombstone
let tombstone = table
.with_sequencer(&sequencer)
@ -1709,10 +1664,7 @@ mod tests {
// Compact them
let batches = compactor
.compact(
vec![pf1.clone(), pf2.clone(), pf3.clone()],
sort_key.clone(),
)
.compact(vec![pf1.clone(), pf2.clone(), pf3.clone()])
.await
.unwrap();
@ -1746,10 +1698,6 @@ mod tests {
],
&batches[1].data
);
// Sort keys should be the same as was passed in to compact
assert_eq!(batches[0].meta.sort_key.as_ref().unwrap(), &sort_key);
assert_eq!(batches[1].meta.sort_key.as_ref().unwrap(), &sort_key);
}
/// A test utility function to make minimially-viable ParquetFile records with particular
@ -2389,7 +2337,7 @@ mod tests {
max_sequence_number: SequenceNumber::new(6),
row_count: 3,
compaction_level: 1, // level of compacted data is always 1
sort_key: Some(SortKey::from_columns(["tag1", "time"])),
sort_key: None,
};
let chunk1 = Arc::new(

View File

@ -101,7 +101,7 @@ impl QueryChunkMeta for QueryableParquetChunk {
}
fn sort_key(&self) -> Option<&SortKey> {
self.iox_metadata.sort_key.as_ref()
None // TODO: return the sortkey when it is available in the parquet file #3968
}
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {

View File

@ -1,6 +1,9 @@
//! This module is responsible for compacting Ingester's data
use crate::data::{PersistingBatch, QueryableBatch};
use crate::{
data::{PersistingBatch, QueryableBatch},
sort_key::{adjust_sort_key_columns, compute_sort_key},
};
use arrow::record_batch::RecordBatch;
use data_types2::{NamespaceId, PartitionInfo};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
@ -12,7 +15,7 @@ use query::{
util::compute_timenanosecond_min_max,
QueryChunk, QueryChunkMeta,
};
use schema::sort::{adjust_sort_key_columns, compute_sort_key, SortKey};
use schema::sort::SortKey;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use time::{Time, TimeProvider};
@ -82,10 +85,7 @@ pub async fn compact_persisting_batch(
adjust_sort_key_columns(&sk, &batch.data.schema().primary_key())
}
None => {
let sort_key = compute_sort_key(
batch.data.schema().as_ref(),
batch.data.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key = compute_sort_key(&batch.data);
// Use the sort key computed from the cardinality as the sort key for this parquet
// file's metadata, also return the sort key to be stored in the catalog
(sort_key.clone(), Some(sort_key))
@ -756,10 +756,7 @@ mod tests {
let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key = compute_sort_key(&compact_batch);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -800,10 +797,7 @@ mod tests {
let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key = compute_sort_key(&compact_batch);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -844,10 +838,7 @@ mod tests {
let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key = compute_sort_key(&compact_batch);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -893,10 +884,7 @@ mod tests {
let expected_pk = vec!["tag1", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key = compute_sort_key(&compact_batch);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -939,10 +927,7 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key = compute_sort_key(&compact_batch);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact
@ -999,10 +984,7 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key = compute_sort_key(&compact_batch);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact
@ -1067,10 +1049,7 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key = compute_sort_key(&compact_batch);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact
@ -1128,10 +1107,7 @@ mod tests {
let expected_pk = vec!["tag1", "tag2", "time"];
assert_eq!(expected_pk, pk);
let sort_key = compute_sort_key(
&schema,
compact_batch.data.iter().map(|sb| sb.data.as_ref()),
);
let sort_key = compute_sort_key(&compact_batch);
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact

View File

@ -24,6 +24,7 @@ mod poison;
pub mod querier_handler;
pub mod query;
pub mod server;
pub mod sort_key;
pub mod stream_handler;
#[cfg(test)]

324
ingester/src/sort_key.rs Normal file
View File

@ -0,0 +1,324 @@
//! Functions for computing a sort key based on cardinality of primary key columns.
use crate::data::{QueryableBatch, SnapshotBatch};
use arrow::{
array::{Array, DictionaryArray, StringArray},
datatypes::{DataType, Int32Type},
record_batch::RecordBatch,
};
use observability_deps::tracing::trace;
use query::QueryChunkMeta;
use schema::{
sort::{SortKey, SortKeyBuilder},
TIME_COLUMN_NAME,
};
use std::{
collections::{HashMap, HashSet},
num::NonZeroU64,
sync::Arc,
};
/// Given a `QueryableBatch`, compute a sort key based on:
///
/// - The columns that make up the primary key of the schema of this batch
/// - Order those columns from low cardinality to high cardinality based on the data
/// - Always have the time column last
pub fn compute_sort_key(queryable_batch: &QueryableBatch) -> SortKey {
let schema = queryable_batch.schema();
let primary_key = schema.primary_key();
let cardinalities = distinct_counts(&queryable_batch.data, &primary_key);
trace!(cardinalities=?cardinalities, "cardinalities of of columns to compute sort key");
let mut cardinalities: Vec<_> = cardinalities.into_iter().collect();
// Sort by (cardinality, column_name) to have deterministic order if same cardinality
cardinalities.sort_by_cached_key(|x| (x.1, x.0.clone()));
let mut builder = SortKeyBuilder::with_capacity(cardinalities.len() + 1);
for (col, _) in cardinalities {
builder = builder.with_col(col)
}
builder = builder.with_col(TIME_COLUMN_NAME);
let key = builder.build();
trace!(computed_sort_key=?key, "Value of sort key from compute_sort_key");
key
}
/// Takes batches of data and the columns that make up the primary key. Computes the number of
/// distinct values for each primary key column across all batches, also known as "cardinality".
/// Used to determine sort order.
fn distinct_counts(
batches: &[Arc<SnapshotBatch>],
primary_key: &[&str],
) -> HashMap<String, NonZeroU64> {
let mut distinct_values_across_batches = HashMap::with_capacity(primary_key.len());
for batch in batches {
for (column, distinct_values) in distinct_values(&batch.data, primary_key) {
let set = distinct_values_across_batches
.entry(column)
.or_insert_with(HashSet::new);
set.extend(distinct_values.into_iter());
}
}
distinct_values_across_batches
.into_iter()
.filter_map(|(column, distinct_values)| {
distinct_values
.len()
.try_into()
.ok()
.and_then(NonZeroU64::new)
.map(|count| (column, count))
})
.collect()
}
/// Takes a `RecordBatch` and the column names that make up the primary key of the schema. Returns
/// a map of column names to the set of the distinct string values, for the specified columns. Used
/// to compute cardinality across multiple `RecordBatch`es.
fn distinct_values(batch: &RecordBatch, primary_key: &[&str]) -> HashMap<String, HashSet<String>> {
let schema = batch.schema();
batch
.columns()
.iter()
.zip(schema.fields())
.filter(|(_col, field)| primary_key.contains(&field.name().as_str()))
.flat_map(|(col, field)| match field.data_type() {
// Dictionaries of I32 => Utf8 are supported as tags in
// `schema::InfluxColumnType::valid_arrow_type`
DataType::Dictionary(key, value)
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
{
let col = col
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.expect("unexpected datatype");
let values = col.values();
let values = values
.as_any()
.downcast_ref::<StringArray>()
.expect("unexpected datatype");
Some((
field.name().into(),
values.iter().flatten().map(ToString::to_string).collect(),
))
}
// Utf8 types are supported as tags
DataType::Utf8 => {
let values = col
.as_any()
.downcast_ref::<StringArray>()
.expect("unexpected datatype");
Some((
field.name().into(),
values.iter().flatten().map(ToString::to_string).collect(),
))
}
// No other data types are supported as tags; don't compute distinct values for them
_ => None,
})
.collect()
}
/// Given a sort key from the catalog and the primary key (tags + time) from the data, return the
/// sort key that should be used for this parquet file and, if needed, the sort key that should
/// be updated in the catalog. These are computed as follows:
///
/// - Columns that appear in both the primary key and the catalog sort key should appear in the
/// same order as they appear in the catalog sort key.
/// - If there are new columns that appear in the primary key, add the new columns to the end of
/// the catalog sort key's tag list. Also return an updated catalog sort key to save the new
/// column in the catalog.
/// - If there are columns that appear in the catalog sort key but aren't present in this data's
/// primary key, don't include them in the sort key to be used for this data. Don't remove them
/// from the catalog sort key.
pub fn adjust_sort_key_columns(
catalog_sort_key: &SortKey,
primary_key: &[&str],
) -> (SortKey, Option<SortKey>) {
let existing_columns_without_time = catalog_sort_key
.iter()
.map(|(col, _opts)| col)
.cloned()
.filter(|col| TIME_COLUMN_NAME != col.as_ref());
let new_columns: Vec<_> = primary_key
.iter()
.filter(|col| !catalog_sort_key.contains(col))
.collect();
let metadata_sort_key = SortKey::from_columns(
existing_columns_without_time
.clone()
.filter(|col| primary_key.contains(&col.as_ref()))
.chain(new_columns.iter().map(|&&col| Arc::from(col)))
.chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))),
);
let catalog_update = if new_columns.is_empty() {
None
} else {
Some(SortKey::from_columns(
existing_columns_without_time
.chain(new_columns.into_iter().map(|&col| Arc::from(col)))
.chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))),
))
};
(metadata_sort_key, catalog_update)
}
#[cfg(test)]
mod tests {
use super::*;
use data_types2::SequenceNumber;
use schema::selection::Selection;
fn lp_to_queryable_batch(line_protocol_batches: &[&str]) -> QueryableBatch {
let data = line_protocol_batches
.iter()
.map(|line_protocol| {
let (_, mb) = mutable_batch_lp::test_helpers::lp_to_mutable_batch(line_protocol);
let rb = mb.to_arrow(Selection::All).unwrap();
Arc::new(SnapshotBatch {
min_sequencer_number: SequenceNumber::new(0),
max_sequencer_number: SequenceNumber::new(1),
data: Arc::new(rb),
})
})
.collect();
QueryableBatch {
data,
delete_predicates: Default::default(),
table_name: Default::default(),
}
}
#[test]
fn test_distinct_values() {
let lp = r#"
cpu,host=a val=23 1
cpu,host=b,env=prod val=2 1
cpu,host=c,env=stage val=11 1
cpu,host=a,env=prod val=14 2
"#;
let qb = lp_to_queryable_batch(&[lp]);
let rb = &qb.data[0].data;
// Pass the tag field names plus time as the primary key, this is what should happen
let distinct = distinct_values(rb, &["host", "env", "time"]);
// The hashmap should contain the distinct values for "host" and "env" only
assert_eq!(distinct.len(), 2);
// Return unique values
assert_eq!(
*distinct.get("host").unwrap(),
HashSet::from(["a".into(), "b".into(), "c".into()]),
);
// TODO: do nulls count as a value?
assert_eq!(
*distinct.get("env").unwrap(),
HashSet::from(["prod".into(), "stage".into()]),
);
// Requesting a column not present returns None
assert_eq!(distinct.get("foo"), None);
// Distinct count isn't computed for the time column or fields
assert_eq!(distinct.get("time"), None);
assert_eq!(distinct.get("val"), None);
// Specify a column in the primary key that doesn't appear in the data
let distinct = distinct_values(rb, &["host", "env", "foo", "time"]);
// The hashmap should contain the distinct values for "host" and "env" only
assert_eq!(distinct.len(), 2);
// Don't specify one of the tag columns for the primary key
let distinct = distinct_values(rb, &["host", "foo", "time"]);
// The hashmap should contain the distinct values for the specified columns only
assert_eq!(distinct.len(), 1);
}
#[test]
fn test_sort_key() {
// Across these three record batches:
// - `host` has 2 distinct values: "a", "b"
// - 'env' has 3 distinct values: "prod", "stage", "dev"
// host's 2 values appear in each record batch, so the distinct counts could be incorrectly
// aggregated together as 2 + 2 + 2 = 6. env's 3 values each occur in their own record
// batch, so they should always be aggregated as 3.
// host has the lower cardinality, so it should appear first in the sort key.
let lp1 = r#"
cpu,host=a,env=prod val=23 1
cpu,host=b,env=prod val=2 2
"#;
let lp2 = r#"
cpu,host=a,env=stage val=23 3
cpu,host=b,env=stage val=2 4
"#;
let lp3 = r#"
cpu,host=a,env=dev val=23 5
cpu,host=b,env=dev val=2 6
"#;
let qb = lp_to_queryable_batch(&[lp1, lp2, lp3]);
let sort_key = compute_sort_key(&qb);
assert_eq!(sort_key, SortKey::from_columns(["host", "env", "time"]));
}
#[test]
fn test_adjust_sort_key_columns() {
// If the catalog sort key is the same as the primary key, no changes
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "env", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
assert_eq!(metadata, catalog_sort_key);
assert!(update.is_none());
// If the catalog sort key contains more columns than the primary key, the metadata key
// should only contain the columns in the data and the catalog should not be updated
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
assert_eq!(metadata, SortKey::from_columns(data_primary_key));
assert!(update.is_none());
// If the catalog sort key contains fewer columns than the primary key, add the new columns
// just before the time column and update the catalog
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "temp", "env", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
let expected = SortKey::from_columns(["host", "env", "temp", "time"]);
assert_eq!(metadata, expected);
assert_eq!(update.unwrap(), expected);
// If the sort key contains a column that doesn't exist in the data and is missing a column,
// the metadata key should only contain the columns in the data and the catalog should be
// updated to include the new column (but not remove the missing column)
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "temp", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
assert_eq!(metadata, SortKey::from_columns(data_primary_key));
let expected = SortKey::from_columns(["host", "env", "temp", "time"]);
assert_eq!(update.unwrap(), expected);
}
}

View File

@ -7,11 +7,11 @@ use arrow::{
use bytes::Bytes;
use data_types2::{
Column, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileParams,
Partition, PartitionId, QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId,
Timestamp, Tombstone, TombstoneId,
Partition, QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp,
Tombstone, TombstoneId,
};
use iox_catalog::{
interface::{Catalog, PartitionRepo, INITIAL_COMPACTION_LEVEL},
interface::{Catalog, INITIAL_COMPACTION_LEVEL},
mem::MemCatalog,
};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
@ -22,7 +22,7 @@ use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData};
use query::exec::Executor;
use schema::{
selection::Selection,
sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder},
sort::{SortKey, SortKeyBuilder},
};
use std::sync::Arc;
use time::{MockProvider, Time, TimeProvider};
@ -435,7 +435,7 @@ impl TestPartition {
max_sequence_number,
row_count: row_count as i64,
compaction_level: INITIAL_COMPACTION_LEVEL,
sort_key: Some(sort_key.clone()),
sort_key: Some(sort_key),
};
let (parquet_metadata_bin, real_file_size_bytes) =
create_parquet_file(&self.catalog.object_store, &metadata, record_batch).await;
@ -462,8 +462,6 @@ impl TestPartition {
.await
.unwrap();
update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await;
Arc::new(TestParquetFile {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(&self.namespace),
@ -509,7 +507,7 @@ impl TestPartition {
max_sequence_number,
row_count: row_count as i64,
compaction_level: INITIAL_COMPACTION_LEVEL,
sort_key: Some(sort_key.clone()),
sort_key: Some(sort_key),
};
let (parquet_metadata_bin, _real_file_size_bytes) =
create_parquet_file(&self.catalog.object_store, &metadata, record_batch).await;
@ -536,8 +534,6 @@ impl TestPartition {
.await
.unwrap();
update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await;
Arc::new(TestParquetFile {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(&self.namespace),
@ -546,49 +542,6 @@ impl TestPartition {
}
}
async fn update_catalog_sort_key_if_needed(
partitions_catalog: &mut dyn PartitionRepo,
partition_id: PartitionId,
sort_key: SortKey,
) {
// Fetch the latest partition info from the catalog
let partition = partitions_catalog
.get_by_id(partition_id)
.await
.unwrap()
.unwrap();
// Similarly to what the ingester does, if there's an existing sort key in the catalog, add new
// columns onto the end
match partition.sort_key() {
Some(catalog_sort_key) => {
let sort_key_string = sort_key.to_columns();
let new_sort_key: Vec<_> = sort_key_string.split(',').collect();
let (_metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &new_sort_key);
if let Some(new_sort_key) = update {
let new_columns = new_sort_key.to_columns();
dbg!(
"Updating sort key from {:?} to {:?}",
catalog_sort_key.to_columns(),
&new_columns,
);
partitions_catalog
.update_sort_key(partition_id, &new_columns)
.await
.unwrap();
}
}
None => {
let new_columns = sort_key.to_columns();
dbg!("Updating sort key from None to {:?}", &new_columns,);
partitions_catalog
.update_sort_key(partition_id, &new_columns)
.await
.unwrap();
}
}
}
/// Create parquet file and return thrift-encoded and zstd-compressed parquet metadata as well as the file size.
async fn create_parquet_file(
object_store: &Arc<DynObjectStore>,

View File

@ -442,9 +442,7 @@ impl Deduplicater {
// the primary key, in order for deduplication to work correctly
assert!(
pk_schema.len() <= sort_key.len(),
"output_sort_key ({:?}) must be at least as long as the primary key ({:?})",
sort_key.to_columns(),
pk_schema,
"output_sort_key must be at least as long as the primary key"
);
assert!(
pk_schema.is_sorted_on_pk(sort_key),

View File

@ -1,20 +1,12 @@
use crate::{Schema, TIME_COLUMN_NAME};
use std::sync::Arc;
use std::{fmt::Display, str::FromStr};
use arrow::compute::SortOptions;
use arrow::{
array::{Array, DictionaryArray, StringArray},
datatypes::{DataType, Int32Type},
record_batch::RecordBatch,
};
use indexmap::{map::Iter, IndexMap};
use itertools::Itertools;
use snafu::Snafu;
use std::{
collections::{HashMap, HashSet},
fmt::Display,
num::NonZeroU64,
str::FromStr,
sync::Arc,
};
use super::TIME_COLUMN_NAME;
#[derive(Debug, Snafu)]
pub enum Error {
@ -203,32 +195,6 @@ impl SortKey {
self.columns.is_empty()
}
/// Filters this sort key to contain only the columns present in the primary key, in the order
/// that the columns appear in this sort key.
///
/// # Panics
///
/// Panics if any columns in the primary key are NOT present in this sort key.
pub fn filter_to(&self, primary_key: &[&str]) -> SortKey {
let missing_from_catalog_key: Vec<_> = primary_key
.iter()
.filter(|col| !self.contains(col))
.collect();
if !missing_from_catalog_key.is_empty() {
panic!(
"Primary key column(s) found that don't appear in the catalog sort key: [{:?}]",
missing_from_catalog_key
)
}
Self::from_columns(
self.iter()
.map(|(col, _opts)| col)
.filter(|col| primary_key.contains(&col.as_ref()))
.cloned(),
)
}
/// Returns merge key of the 2 given keys if one covers the other. Returns None otherwise.
/// Key1 is said to cover key2 if key2 is a subset and in the same order of key1.
/// Examples:
@ -322,164 +288,9 @@ impl Display for SortKey {
}
}
/// Given a `Schema` and an iterator of `RecordBatch`es, compute a sort key based on:
///
/// - The columns that make up the primary key of the schema
/// - Order those columns from low cardinality to high cardinality based on the data
/// - Always have the time column last
pub fn compute_sort_key<'a>(
schema: &Schema,
batches: impl Iterator<Item = &'a RecordBatch>,
) -> SortKey {
let primary_key = schema.primary_key();
let cardinalities = distinct_counts(batches, &primary_key);
let mut cardinalities: Vec<_> = cardinalities.into_iter().collect();
// Sort by (cardinality, column_name) to have deterministic order if same cardinality
cardinalities.sort_by_cached_key(|x| (x.1, x.0.clone()));
let mut builder = SortKeyBuilder::with_capacity(cardinalities.len() + 1);
for (col, _) in cardinalities {
builder = builder.with_col(col)
}
builder = builder.with_col(TIME_COLUMN_NAME);
builder.build()
}
/// Takes batches of data and the columns that make up the primary key. Computes the number of
/// distinct values for each primary key column across all batches, also known as "cardinality".
/// Used to determine sort order.
fn distinct_counts<'a>(
batches: impl Iterator<Item = &'a RecordBatch>,
primary_key: &[&str],
) -> HashMap<String, NonZeroU64> {
let mut distinct_values_across_batches = HashMap::with_capacity(primary_key.len());
for batch in batches {
for (column, distinct_values) in distinct_values(batch, primary_key) {
let set = distinct_values_across_batches
.entry(column)
.or_insert_with(HashSet::new);
set.extend(distinct_values.into_iter());
}
}
distinct_values_across_batches
.into_iter()
.filter_map(|(column, distinct_values)| {
distinct_values
.len()
.try_into()
.ok()
.and_then(NonZeroU64::new)
.map(|count| (column, count))
})
.collect()
}
/// Takes a `RecordBatch` and the column names that make up the primary key of the schema. Returns
/// a map of column names to the set of the distinct string values, for the specified columns. Used
/// to compute cardinality across multiple `RecordBatch`es.
fn distinct_values(batch: &RecordBatch, primary_key: &[&str]) -> HashMap<String, HashSet<String>> {
let schema = batch.schema();
batch
.columns()
.iter()
.zip(schema.fields())
.filter(|(_col, field)| primary_key.contains(&field.name().as_str()))
.flat_map(|(col, field)| match field.data_type() {
// Dictionaries of I32 => Utf8 are supported as tags in
// `schema::InfluxColumnType::valid_arrow_type`
DataType::Dictionary(key, value)
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
{
let col = col
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.expect("unexpected datatype");
let values = col.values();
let values = values
.as_any()
.downcast_ref::<StringArray>()
.expect("unexpected datatype");
Some((
field.name().into(),
values.iter().flatten().map(ToString::to_string).collect(),
))
}
// Utf8 types are supported as tags
DataType::Utf8 => {
let values = col
.as_any()
.downcast_ref::<StringArray>()
.expect("unexpected datatype");
Some((
field.name().into(),
values.iter().flatten().map(ToString::to_string).collect(),
))
}
// No other data types are supported as tags; don't compute distinct values for them
_ => None,
})
.collect()
}
/// Given a sort key from the catalog and the primary key (tags + time) from the data, return the
/// sort key that should be used for this parquet file and, if needed, the sort key that should
/// be updated in the catalog. These are computed as follows:
///
/// - Columns that appear in both the primary key and the catalog sort key should appear in the
/// same order as they appear in the catalog sort key.
/// - If there are new columns that appear in the primary key, add the new columns to the end of
/// the catalog sort key's tag list. Also return an updated catalog sort key to save the new
/// column in the catalog.
/// - If there are columns that appear in the catalog sort key but aren't present in this data's
/// primary key, don't include them in the sort key to be used for this data. Don't remove them
/// from the catalog sort key.
pub fn adjust_sort_key_columns(
catalog_sort_key: &SortKey,
primary_key: &[&str],
) -> (SortKey, Option<SortKey>) {
let existing_columns_without_time = catalog_sort_key
.iter()
.map(|(col, _opts)| col)
.cloned()
.filter(|col| TIME_COLUMN_NAME != col.as_ref());
let new_columns: Vec<_> = primary_key
.iter()
.filter(|col| !catalog_sort_key.contains(col))
.collect();
let metadata_sort_key = SortKey::from_columns(
existing_columns_without_time
.clone()
.filter(|col| primary_key.contains(&col.as_ref()))
.chain(new_columns.iter().map(|&&col| Arc::from(col)))
.chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))),
);
let catalog_update = if new_columns.is_empty() {
None
} else {
Some(SortKey::from_columns(
existing_columns_without_time
.chain(new_columns.into_iter().map(|&col| Arc::from(col)))
.chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))),
))
};
(metadata_sort_key, catalog_update)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::builder::SchemaBuilder;
use arrow::array::ArrayRef;
#[test]
fn test_parse() {
@ -697,182 +508,4 @@ mod tests {
let merge_key = SortKey::try_merge_key(&key_abc_2, &key_bac_2);
assert_eq!(merge_key, None);
}
fn to_string_array(values: impl Into<StringArray>) -> ArrayRef {
Arc::new(values.into()) as ArrayRef
}
#[test]
fn test_distinct_values() {
let rb = RecordBatch::try_from_iter(vec![
("host", to_string_array(vec!["a", "b", "c", "a"])),
(
"env",
to_string_array(vec![None, Some("prod"), Some("stage"), Some("prod")]),
),
])
.unwrap();
// Pass the tag field names plus time as the primary key, this is what should happen
let distinct = distinct_values(&rb, &["host", "env", "time"]);
// The hashmap should contain the distinct values for "host" and "env" only
assert_eq!(distinct.len(), 2);
// Return unique values
assert_eq!(
*distinct.get("host").unwrap(),
HashSet::from(["a".into(), "b".into(), "c".into()]),
);
// TODO: do nulls count as a value?
assert_eq!(
*distinct.get("env").unwrap(),
HashSet::from(["prod".into(), "stage".into()]),
);
// Requesting a column not present returns None
assert_eq!(distinct.get("foo"), None);
// Distinct count isn't computed for the time column or fields
assert_eq!(distinct.get("time"), None);
assert_eq!(distinct.get("val"), None);
// Specify a column in the primary key that doesn't appear in the data
let distinct = distinct_values(&rb, &["host", "env", "foo", "time"]);
// The hashmap should contain the distinct values for "host" and "env" only
assert_eq!(distinct.len(), 2);
// Don't specify one of the tag columns for the primary key
let distinct = distinct_values(&rb, &["host", "foo", "time"]);
// The hashmap should contain the distinct values for the specified columns only
assert_eq!(distinct.len(), 1);
}
#[test]
fn test_sort_key() {
// Across these three record batches:
// - `host` has 2 distinct values: "a", "b"
// - 'env' has 3 distinct values: "prod", "stage", "dev"
// host's 2 values appear in each record batch, so the distinct counts could be incorrectly
// aggregated together as 2 + 2 + 2 = 6. env's 3 values each occur in their own record
// batch, so they should always be aggregated as 3.
// host has the lower cardinality, so it should appear first in the sort key.
let rb1 = Arc::new(
RecordBatch::try_from_iter(vec![
("host", to_string_array(vec!["a", "b"])),
("env", to_string_array(vec!["prod", "prod"])),
])
.unwrap(),
);
let rb2 = Arc::new(
RecordBatch::try_from_iter(vec![
("host", to_string_array(vec!["a", "b"])),
("env", to_string_array(vec!["stage", "stage"])),
])
.unwrap(),
);
let rb3 = Arc::new(
RecordBatch::try_from_iter(vec![
("host", to_string_array(vec!["a", "b"])),
("env", to_string_array(vec!["dev", "dev"])),
])
.unwrap(),
);
let rbs = [rb1, rb2, rb3];
let schema = SchemaBuilder::new()
.tag("host")
.tag("env")
.timestamp()
.build()
.unwrap();
let sort_key = compute_sort_key(&schema, rbs.iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["host", "env", "time"]));
}
#[test]
fn test_adjust_sort_key_columns() {
// If the catalog sort key is the same as the primary key, no changes
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "env", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
assert_eq!(metadata, catalog_sort_key);
assert!(update.is_none());
// If the catalog sort key contains more columns than the primary key, the metadata key
// should only contain the columns in the data and the catalog should not be updated
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
assert_eq!(metadata, SortKey::from_columns(data_primary_key));
assert!(update.is_none());
// If the catalog sort key contains fewer columns than the primary key, add the new columns
// just before the time column and update the catalog
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "temp", "env", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
let expected = SortKey::from_columns(["host", "env", "temp", "time"]);
assert_eq!(metadata, expected);
assert_eq!(update.unwrap(), expected);
// If the sort key contains a column that doesn't exist in the data and is missing a column,
// the metadata key should only contain the columns in the data and the catalog should be
// updated to include the new column (but not remove the missing column)
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "temp", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
assert_eq!(metadata, SortKey::from_columns(data_primary_key));
let expected = SortKey::from_columns(["host", "env", "temp", "time"]);
assert_eq!(update.unwrap(), expected);
}
#[test]
fn test_filter_to_primary_key() {
// If the catalog sort key is the same as the primary key, no changes
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "env", "time"];
let filtered = catalog_sort_key.filter_to(&data_primary_key);
assert_eq!(catalog_sort_key, filtered);
// If the catalog sort key contains more columns than the primary key, the filtered key
// should only contain the columns in the primary key
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "time"];
let filtered = catalog_sort_key.filter_to(&data_primary_key);
let expected = SortKey::from_columns(["host", "time"]);
assert_eq!(expected, filtered);
// If the catalog sort key has columns in a different order than the primary key, the
// filtered key should contain the columns in the same order as the catalog sort key.
let catalog_sort_key = SortKey::from_columns(["host", "env", "zone", "time"]);
let data_primary_key = ["env", "host", "time"];
let filtered = catalog_sort_key.filter_to(&data_primary_key);
let expected = SortKey::from_columns(["host", "env", "time"]);
assert_eq!(expected, filtered);
}
#[test]
#[should_panic]
fn test_filter_missing_columns() {
// If the primary key has columns that are missing from the catalog sort key, panic.
// This should never happen because the ingester should save the sort key as the union
// of all primary key columns it has seen, and the compactor shouldn't get data that hasn't
// been through the ingester.
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "env", "zone", "time"];
catalog_sort_key.filter_to(&data_primary_key);
}
}