Merge branch 'main' into dom/ingester-op-instrumentation

pull/24376/head
kodiakhq[bot] 2022-04-07 16:33:25 +00:00 committed by GitHub
commit 8bd0bfb669
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 720 additions and 59 deletions

View File

@ -413,6 +413,8 @@ impl Compactor {
// compact
let split_compacted_files = self.compact(group.parquet_files).await?;
debug!("compacted files");
let mut catalog_update_info = Vec::with_capacity(split_compacted_files.len());
for split_file in split_compacted_files {
@ -589,6 +591,11 @@ impl Compactor {
})
.collect();
debug!(
n_query_chunks = query_chunks.len(),
"gathered parquet data to compact"
);
// Compute min & max sequence numbers and time
// unwrap here will work becasue the len of the query_chunks already >= 1
let (head, tail) = query_chunks.split_first().unwrap();

View File

@ -9,7 +9,7 @@ use futures::{
};
use iox_catalog::interface::Catalog;
use object_store::DynObjectStore;
use observability_deps::tracing::{info, warn};
use observability_deps::tracing::*;
use query::exec::Executor;
use std::sync::Arc;
use thiserror::Error;
@ -163,28 +163,33 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
.await
.expect("retry forever");
let n_candidates = candidates.len();
debug!(n_candidates, "found compaction candidates");
let mut used_size = 0;
let max_size = compactor.config.max_concurrent_compaction_size_bytes();
let max_file_size = compactor.config.compaction_max_size_bytes();
let mut handles = vec![];
for c in &candidates {
for c in candidates {
let compactor = Arc::clone(&compactor);
let partition_id = c.partition_id;
let handle = tokio::task::spawn(async move {
debug!(candidate=?c, "compacting candidate");
if let Err(e) = compactor
.compact_partition(partition_id, max_file_size)
.compact_partition(c.partition_id, max_file_size)
.await
{
warn!(
"compaction on partition {} failed with: {:?}",
partition_id, e
c.partition_id, e
);
}
debug!(candidate=?c, "compaction complete");
});
used_size += c.file_size_bytes;
handles.push(handle);
if used_size > max_size {
debug!(%max_size, %used_size, n_compactions=%handles.len(), "reached maximum concurrent compaction size limit");
break;
}
}
@ -194,7 +199,7 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
let _ = futures::future::join_all(handles).await;
// if all candidate partitions have been compacted, wait a bit before checking again
if compactions_run == candidates.len() {
if compactions_run == n_candidates {
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

View File

@ -7,6 +7,7 @@ use data_types2::{
};
use iox_object_store::IoxObjectStore;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use parquet_file::{
chunk::{new_parquet_chunk, ChunkMetrics, DecodedParquetFile},
metadata::{IoxMetadata, IoxParquetMetaData},
@ -18,6 +19,7 @@ use std::{
/// Wrapper of a group of parquet files and their tombstones that overlap in time and should be
/// considered during compaction.
#[derive(Debug)]
pub struct GroupWithTombstones {
/// Each file with the set of tombstones relevant to it
pub(crate) parquet_files: Vec<ParquetFileWithTombstone>,
@ -97,6 +99,11 @@ impl ParquetFileWithTombstone {
Arc::new(iox_object_store),
);
debug!(
parquet_file=?decoded_parquet_file.parquet_file,
"generated parquet chunk from object store"
);
QueryableParquetChunk::new(
table_name,
Arc::new(parquet_chunk),

View File

@ -12,7 +12,7 @@
use influxdb_line_protocol::FieldValue;
use predicate::{delete_predicate::parse_delete_predicate, Predicate};
use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType, Schema};
use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema};
use std::{
collections::BTreeMap,
convert::TryFrom,
@ -661,6 +661,15 @@ pub struct Partition {
pub sort_key: Option<String>,
}
impl Partition {
/// The sort key for the partition, if present, structured as a `SortKey`
pub fn sort_key(&self) -> Option<SortKey> {
self.sort_key
.as_ref()
.map(|s| SortKey::from_columns(s.split(',')))
}
}
/// Information for a partition from the catalog.
#[derive(Debug)]
#[allow(missing_docs)]

View File

@ -2,10 +2,10 @@
use crate::{
data::{PersistingBatch, QueryableBatch},
sort_key::compute_sort_key,
sort_key::{adjust_sort_key_columns, compute_sort_key},
};
use arrow::record_batch::RecordBatch;
use data_types2::NamespaceId;
use data_types2::{NamespaceId, PartitionInfo};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
use parquet_file::metadata::IoxMetadata;
@ -61,21 +61,39 @@ pub async fn compact_persisting_batch(
time_provider: Arc<dyn TimeProvider>,
executor: &Executor,
namespace_id: i32,
namespace_name: &str,
table_name: &str,
partition_key: &str,
partition_info: &PartitionInfo,
batch: Arc<PersistingBatch>,
) -> Result<Option<(Vec<RecordBatch>, IoxMetadata)>> {
) -> Result<Option<(Vec<RecordBatch>, IoxMetadata, Option<SortKey>)>> {
// Nothing to compact
if batch.data.data.is_empty() {
return Ok(None);
}
// Get sort key based on cardinality
let sort_key = compute_sort_key(&batch.data);
let namespace_name = &partition_info.namespace_name;
let table_name = &partition_info.table_name;
let partition_key = &partition_info.partition.partition_key;
let sort_key = partition_info.partition.sort_key();
// Get sort key from the catalog or compute it from cardinality. Save a new value for the
// catalog to store if necessary.
let (metadata_sort_key, sort_key_update) = match sort_key {
Some(sk) => {
// Remove any columns not present in this data from the sort key that will be in this
// parquet file's metadata.
// If there are any new columns, add them to the end of the sort key in the catalog and
// return that to be updated in the catalog.
adjust_sort_key_columns(&sk, &batch.data.schema().primary_key())
}
None => {
let sort_key = compute_sort_key(&batch.data);
// Use the sort key computed from the cardinality as the sort key for this parquet
// file's metadata, also return the sort key to be stored in the catalog
(sort_key.clone(), Some(sort_key))
}
};
// Compact
let stream = compact(executor, Arc::clone(&batch.data), sort_key.clone()).await?;
let stream = compact(executor, Arc::clone(&batch.data), metadata_sort_key.clone()).await?;
// Collect compacted data into record batches for computing statistics
let output_batches = datafusion::physical_plan::common::collect(stream)
.await
@ -102,21 +120,21 @@ pub async fn compact_persisting_batch(
creation_timestamp: time_provider.now(),
sequencer_id: batch.sequencer_id,
namespace_id: NamespaceId::new(namespace_id),
namespace_name: Arc::from(namespace_name),
namespace_name: Arc::from(namespace_name.as_str()),
table_id: batch.table_id,
table_name: Arc::from(table_name),
table_name: Arc::from(table_name.as_str()),
partition_id: batch.partition_id,
partition_key: Arc::from(partition_key),
partition_key: Arc::from(partition_key.as_str()),
time_of_first_write: Time::from_timestamp_nanos(min_time),
time_of_last_write: Time::from_timestamp_nanos(max_time),
min_sequence_number: min_seq,
max_sequence_number: max_seq,
row_count,
compaction_level: INITIAL_COMPACTION_LEVEL,
sort_key: Some(sort_key),
sort_key: Some(metadata_sort_key),
};
Ok(Some((output_batches, meta)))
Ok(Some((output_batches, meta, sort_key_update)))
}
/// Compact a given Queryable Batch
@ -150,7 +168,8 @@ pub async fn compact(
mod tests {
use super::*;
use crate::test_util::{
create_batches_with_influxtype, create_batches_with_influxtype_different_columns,
create_batches_with_influxtype, create_batches_with_influxtype_different_cardinality,
create_batches_with_influxtype_different_columns,
create_batches_with_influxtype_different_columns_different_order,
create_batches_with_influxtype_same_columns_different_type,
create_one_record_batch_with_influxtype_duplicates,
@ -159,6 +178,7 @@ mod tests {
make_persisting_batch, make_queryable_batch, make_queryable_batch_with_deletes,
};
use arrow_util::assert_batches_eq;
use data_types2::{Partition, PartitionId, SequencerId, TableId};
use mutable_batch_lp::lines_to_batches;
use schema::selection::Selection;
use time::SystemProvider;
@ -206,18 +226,23 @@ mod tests {
// compact
let exc = Executor::new(1);
let time_provider = Arc::new(SystemProvider::new());
let (output_batches, _) = compact_persisting_batch(
time_provider,
&exc,
1,
namespace_name,
table_name,
partition_key,
persisting_batch,
)
.await
.unwrap()
.unwrap();
let partition_info = PartitionInfo {
namespace_name: namespace_name.into(),
table_name: table_name.into(),
partition: Partition {
id: PartitionId::new(partition_id),
sequencer_id: SequencerId::new(seq_id),
table_id: TableId::new(table_id),
partition_key: partition_key.into(),
sort_key: None,
},
};
let (output_batches, _, _) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
// verify compacted data
// should be the same as the input but sorted on tag1 & time
@ -267,18 +292,23 @@ mod tests {
// compact
let exc = Executor::new(1);
let time_provider = Arc::new(SystemProvider::new());
let (output_batches, meta) = compact_persisting_batch(
time_provider,
&exc,
1,
namespace_name,
table_name,
partition_key,
persisting_batch,
)
.await
.unwrap()
.unwrap();
let partition_info = PartitionInfo {
namespace_name: namespace_name.into(),
table_name: table_name.into(),
partition: Partition {
id: PartitionId::new(partition_id),
sequencer_id: SequencerId::new(seq_id),
table_id: TableId::new(table_id),
partition_key: partition_key.into(),
sort_key: None,
},
};
let (output_batches, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
// verify compacted data
// should be the same as the input but sorted on tag1 & time
@ -312,6 +342,402 @@ mod tests {
Some(SortKey::from_columns(["tag1", "time"])),
);
assert_eq!(expected_meta, meta);
assert_eq!(
sort_key_update.unwrap(),
SortKey::from_columns(["tag1", "time"])
);
}
#[tokio::test]
async fn test_compact_persisting_batch_no_sort_key() {
// create input data
let batches = create_batches_with_influxtype_different_cardinality().await;
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let namespace_name = "test_namespace";
let partition_key = "test_partition_key";
let table_name = "test_table";
let seq_id = 1;
let seq_num_start: i64 = 1;
let seq_num_end: i64 = seq_num_start; // one batch
let namespace_id = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
seq_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
vec![],
);
// verify PK
let schema = persisting_batch.data.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag3", "time"];
assert_eq!(expected_pk, pk);
// compact
let exc = Executor::new(1);
let time_provider = Arc::new(SystemProvider::new());
let partition_info = PartitionInfo {
namespace_name: namespace_name.into(),
table_name: table_name.into(),
partition: Partition {
id: PartitionId::new(partition_id),
sequencer_id: SequencerId::new(seq_id),
table_id: TableId::new(table_id),
partition_key: partition_key.into(),
// NO SORT KEY from the catalog here, first persisting batch
sort_key: None,
},
};
let (output_batches, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
// verify compacted data
// should be the same as the input but sorted on the computed sort key of tag1, tag3, & time
let expected_data = vec![
"+-----------+------+------+-----------------------------+",
"| field_int | tag1 | tag3 | time |",
"+-----------+------+------+-----------------------------+",
"| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |",
"| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |",
"| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |",
"| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |",
"+-----------+------+------+-----------------------------+",
];
assert_batches_eq!(&expected_data, &output_batches);
let expected_meta = make_meta(
uuid,
meta.creation_timestamp,
seq_id,
namespace_id,
namespace_name,
table_id,
table_name,
partition_id,
partition_key,
28000,
220000,
seq_num_start,
seq_num_end,
4,
INITIAL_COMPACTION_LEVEL,
// Sort key should now be set
Some(SortKey::from_columns(["tag1", "tag3", "time"])),
);
assert_eq!(expected_meta, meta);
assert_eq!(
sort_key_update.unwrap(),
SortKey::from_columns(["tag1", "tag3", "time"])
);
}
#[tokio::test]
async fn test_compact_persisting_batch_with_specified_sort_key() {
// create input data
let batches = create_batches_with_influxtype_different_cardinality().await;
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let namespace_name = "test_namespace";
let partition_key = "test_partition_key";
let table_name = "test_table";
let seq_id = 1;
let seq_num_start: i64 = 1;
let seq_num_end: i64 = seq_num_start; // one batch
let namespace_id = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
seq_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
vec![],
);
// verify PK
let schema = persisting_batch.data.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag3", "time"];
assert_eq!(expected_pk, pk);
// compact
let exc = Executor::new(1);
let time_provider = Arc::new(SystemProvider::new());
let partition_info = PartitionInfo {
namespace_name: namespace_name.into(),
table_name: table_name.into(),
partition: Partition {
id: PartitionId::new(partition_id),
sequencer_id: SequencerId::new(seq_id),
table_id: TableId::new(table_id),
partition_key: partition_key.into(),
// SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog
// this is NOT what the computed sort key would be based on this data's cardinality
sort_key: Some("tag3,tag1,time".into()),
},
};
let (output_batches, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
// verify compacted data
// should be the same as the input but sorted on the specified sort key of tag3, tag1, &
// time
let expected_data = vec![
"+-----------+------+------+-----------------------------+",
"| field_int | tag1 | tag3 | time |",
"+-----------+------+------+-----------------------------+",
"| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |",
"| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |",
"| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |",
"| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |",
"+-----------+------+------+-----------------------------+",
];
assert_batches_eq!(&expected_data, &output_batches);
let expected_meta = make_meta(
uuid,
meta.creation_timestamp,
seq_id,
namespace_id,
namespace_name,
table_id,
table_name,
partition_id,
partition_key,
28000,
220000,
seq_num_start,
seq_num_end,
4,
INITIAL_COMPACTION_LEVEL,
// The sort key in the metadata should be the same as specified (that is, not
// recomputed)
Some(SortKey::from_columns(["tag3", "tag1", "time"])),
);
assert_eq!(expected_meta, meta);
// The sort key does not need to be updated in the catalog
assert!(sort_key_update.is_none());
}
#[tokio::test]
async fn test_compact_persisting_batch_new_column_for_sort_key() {
// create input data
let batches = create_batches_with_influxtype_different_cardinality().await;
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let namespace_name = "test_namespace";
let partition_key = "test_partition_key";
let table_name = "test_table";
let seq_id = 1;
let seq_num_start: i64 = 1;
let seq_num_end: i64 = seq_num_start; // one batch
let namespace_id = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
seq_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
vec![],
);
// verify PK
let schema = persisting_batch.data.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag3", "time"];
assert_eq!(expected_pk, pk);
// compact
let exc = Executor::new(1);
let time_provider = Arc::new(SystemProvider::new());
let partition_info = PartitionInfo {
namespace_name: namespace_name.into(),
table_name: table_name.into(),
partition: Partition {
id: PartitionId::new(partition_id),
sequencer_id: SequencerId::new(seq_id),
table_id: TableId::new(table_id),
partition_key: partition_key.into(),
// SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog
// this is NOT what the computed sort key would be based on this data's cardinality
// The new column, tag1, should get added just before the time column
sort_key: Some("tag3,time".into()),
},
};
let (output_batches, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
// verify compacted data
// should be the same as the input but sorted on the specified sort key of tag3, tag1, &
// time
let expected_data = vec![
"+-----------+------+------+-----------------------------+",
"| field_int | tag1 | tag3 | time |",
"+-----------+------+------+-----------------------------+",
"| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |",
"| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |",
"| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |",
"| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |",
"+-----------+------+------+-----------------------------+",
];
assert_batches_eq!(&expected_data, &output_batches);
let expected_meta = make_meta(
uuid,
meta.creation_timestamp,
seq_id,
namespace_id,
namespace_name,
table_id,
table_name,
partition_id,
partition_key,
28000,
220000,
seq_num_start,
seq_num_end,
4,
INITIAL_COMPACTION_LEVEL,
// The sort key in the metadata should be updated to include the new column just before
// the time column
Some(SortKey::from_columns(["tag3", "tag1", "time"])),
);
assert_eq!(expected_meta, meta);
// The sort key in the catalog needs to be updated to include the new column
assert_eq!(
sort_key_update.unwrap(),
SortKey::from_columns(["tag3", "tag1", "time"])
);
}
#[tokio::test]
async fn test_compact_persisting_batch_missing_column_for_sort_key() {
// create input data
let batches = create_batches_with_influxtype_different_cardinality().await;
// build persisting batch from the input batches
let uuid = Uuid::new_v4();
let namespace_name = "test_namespace";
let partition_key = "test_partition_key";
let table_name = "test_table";
let seq_id = 1;
let seq_num_start: i64 = 1;
let seq_num_end: i64 = seq_num_start; // one batch
let namespace_id = 1;
let table_id = 1;
let partition_id = 1;
let persisting_batch = make_persisting_batch(
seq_id,
seq_num_start,
table_id,
table_name,
partition_id,
uuid,
batches,
vec![],
);
// verify PK
let schema = persisting_batch.data.schema();
let pk = schema.primary_key();
let expected_pk = vec!["tag1", "tag3", "time"];
assert_eq!(expected_pk, pk);
// compact
let exc = Executor::new(1);
let time_provider = Arc::new(SystemProvider::new());
let partition_info = PartitionInfo {
namespace_name: namespace_name.into(),
table_name: table_name.into(),
partition: Partition {
id: PartitionId::new(partition_id),
sequencer_id: SequencerId::new(seq_id),
table_id: TableId::new(table_id),
partition_key: partition_key.into(),
// SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog
// this is NOT what the computed sort key would be based on this data's cardinality
// This contains a sort key, "tag4", that doesn't appear in the data.
sort_key: Some("tag3,tag1,tag4,time".into()),
},
};
let (output_batches, meta, sort_key_update) =
compact_persisting_batch(time_provider, &exc, 1, &partition_info, persisting_batch)
.await
.unwrap()
.unwrap();
// verify compacted data
// should be the same as the input but sorted on the specified sort key of tag3, tag1, &
// time
let expected_data = vec![
"+-----------+------+------+-----------------------------+",
"| field_int | tag1 | tag3 | time |",
"+-----------+------+------+-----------------------------+",
"| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |",
"| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |",
"| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |",
"| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |",
"+-----------+------+------+-----------------------------+",
];
assert_batches_eq!(&expected_data, &output_batches);
let expected_meta = make_meta(
uuid,
meta.creation_timestamp,
seq_id,
namespace_id,
namespace_name,
table_id,
table_name,
partition_id,
partition_key,
28000,
220000,
seq_num_start,
seq_num_end,
4,
INITIAL_COMPACTION_LEVEL,
// The sort key in the metadata should only contain the columns in this file
Some(SortKey::from_columns(["tag3", "tag1", "time"])),
);
assert_eq!(expected_meta, meta);
// The sort key in the catalog should NOT get a new value
assert!(sort_key_update.is_none());
}
#[tokio::test]

View File

@ -238,13 +238,11 @@ impl Persister for IngesterData {
if let Some(persisting_batch) = persisting_batch {
// do the CPU intensive work of compaction, de-duplication and sorting
let (record_batches, iox_meta) = match compact_persisting_batch(
let (record_batches, iox_meta, sort_key_update) = match compact_persisting_batch(
Arc::new(SystemProvider::new()),
&self.exec,
namespace.namespace_id.get(),
&partition_info.namespace_name,
&partition_info.table_name,
&partition_info.partition.partition_key,
&partition_info,
Arc::clone(&persisting_batch),
)
.await
@ -281,6 +279,21 @@ impl Persister for IngesterData {
.expect("retry forever");
}
// Update the sort key in the catalog if there are additional columns
if let Some(new_sort_key) = sort_key_update {
let sort_key_string = new_sort_key.to_columns();
Backoff::new(&self.backoff_config)
.retry_all_errors("update_sort_key", || async {
let mut repos = self.catalog.repositories().await;
repos
.partitions()
.update_sort_key(partition_id, &sort_key_string)
.await
})
.await
.expect("retry forever");
}
// and remove the persisted data from memory
namespace
.mark_persisted(
@ -1702,6 +1715,18 @@ mod tests {
table_id = mem_table.table_id;
partition_id = p.id;
}
{
// verify the partition doesn't have a sort key before any data has been persisted
let mut repos = catalog.repositories().await;
let partition_info = repos
.partitions()
.partition_info_by_id(partition_id)
.await
.unwrap()
.unwrap();
assert!(partition_info.partition.sort_key.is_none());
}
data.persist(partition_id).await;
// verify that a file got put into object store
@ -1732,6 +1757,15 @@ mod tests {
assert_eq!(pf.sequencer_id, sequencer1.id);
assert!(pf.to_delete.is_none());
// verify it set a sort key on the partition in the catalog
let partition_info = repos
.partitions()
.partition_info_by_id(partition_id)
.await
.unwrap()
.unwrap();
assert_eq!(partition_info.partition.sort_key.unwrap(), "time");
let mem_table = n.table_data("mem").unwrap();
let mem_table = mem_table.read().await;

View File

@ -129,6 +129,53 @@ fn distinct_values(batch: &RecordBatch, primary_key: &[&str]) -> HashMap<String,
.collect()
}
/// Given a sort key from the catalog and the primary key (tags + time) from the data, return the
/// sort key that should be used for this parquet file and, if needed, the sort key that should
/// be updated in the catalog. These are computed as follows:
///
/// - Columns that appear in both the primary key and the catalog sort key should appear in the
/// same order as they appear in the catalog sort key.
/// - If there are new columns that appear in the primary key, add the new columns to the end of
/// the catalog sort key's tag list. Also return an updated catalog sort key to save the new
/// column in the catalog.
/// - If there are columns that appear in the catalog sort key but aren't present in this data's
/// primary key, don't include them in the sort key to be used for this data. Don't remove them
/// from the catalog sort key.
pub fn adjust_sort_key_columns(
catalog_sort_key: &SortKey,
primary_key: &[&str],
) -> (SortKey, Option<SortKey>) {
let existing_columns_without_time = catalog_sort_key
.iter()
.map(|(col, _opts)| col)
.cloned()
.filter(|col| TIME_COLUMN_NAME != col.as_ref());
let new_columns: Vec<_> = primary_key
.iter()
.filter(|col| !catalog_sort_key.contains(col))
.collect();
let metadata_sort_key = SortKey::from_columns(
existing_columns_without_time
.clone()
.filter(|col| primary_key.contains(&col.as_ref()))
.chain(new_columns.iter().map(|&&col| Arc::from(col)))
.chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))),
);
let catalog_update = if new_columns.is_empty() {
None
} else {
Some(SortKey::from_columns(
existing_columns_without_time
.chain(new_columns.into_iter().map(|&col| Arc::from(col)))
.chain(std::iter::once(Arc::from(TIME_COLUMN_NAME))),
))
};
(metadata_sort_key, catalog_update)
}
#[cfg(test)]
mod tests {
use super::*;
@ -230,4 +277,48 @@ mod tests {
assert_eq!(sort_key, SortKey::from_columns(["host", "env", "time"]));
}
#[test]
fn test_adjust_sort_key_columns() {
// If the catalog sort key is the same as the primary key, no changes
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "env", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
assert_eq!(metadata, catalog_sort_key);
assert!(update.is_none());
// If the catalog sort key contains more columns than the primary key, the metadata key
// should only contain the columns in the data and the catalog should not be updated
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
assert_eq!(metadata, SortKey::from_columns(data_primary_key));
assert!(update.is_none());
// If the catalog sort key contains fewer columns than the primary key, add the new columns
// just before the time column and update the catalog
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "temp", "env", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
let expected = SortKey::from_columns(["host", "env", "temp", "time"]);
assert_eq!(metadata, expected);
assert_eq!(update.unwrap(), expected);
// If the sort key contains a column that doesn't exist in the data and is missing a column,
// the metadata key should only contain the columns in the data and the catalog should be
// updated to include the new column (but not remove the missing column)
let catalog_sort_key = SortKey::from_columns(["host", "env", "time"]);
let data_primary_key = ["host", "temp", "time"];
let (metadata, update) = adjust_sort_key_columns(&catalog_sort_key, &data_primary_key);
assert_eq!(metadata, SortKey::from_columns(data_primary_key));
let expected = SortKey::from_columns(["host", "env", "temp", "time"]);
assert_eq!(update.unwrap(), expected);
}
}

View File

@ -528,6 +528,60 @@ pub async fn create_batches_with_influxtype_different_columns_different_order(
batches
}
/// Has 2 tag columns; tag1 has a lower cardinality (3) than tag3 (4)
pub async fn create_batches_with_influxtype_different_cardinality() -> Vec<Arc<RecordBatch>> {
// Use the available TestChunk to create chunks and then convert them to raw RecordBatches
let mut batches = vec![];
let chunk1 = Arc::new(
TestChunk::new("t")
.with_id(1)
.with_time_column()
.with_tag_column("tag1")
.with_i64_field_column("field_int")
.with_tag_column("tag3")
.with_four_rows_of_data(),
);
let batch1 = raw_data(&[chunk1]).await[0].clone();
let expected = vec![
"+-----------+------+------+-----------------------------+",
"| field_int | tag1 | tag3 | time |",
"+-----------+------+------+-----------------------------+",
"| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |",
"| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |",
"| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |",
"| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |",
"+-----------+------+------+-----------------------------+",
];
assert_batches_eq!(&expected, &[batch1.clone()]);
batches.push(Arc::new(batch1.clone()));
let chunk2 = Arc::new(
TestChunk::new("t")
.with_id(2)
.with_time_column()
.with_tag_column("tag1")
.with_tag_column("tag3")
.with_i64_field_column("field_int")
.with_four_rows_of_data(),
);
let batch2 = raw_data(&[chunk2]).await[0].clone();
let expected = vec![
"+-----------+------+------+-----------------------------+",
"| field_int | tag1 | tag3 | time |",
"+-----------+------+------+-----------------------------+",
"| 1000 | WA | TX | 1970-01-01T00:00:00.000028Z |",
"| 10 | VT | PR | 1970-01-01T00:00:00.000210Z |",
"| 70 | UT | OR | 1970-01-01T00:00:00.000220Z |",
"| 50 | VT | AL | 1970-01-01T00:00:00.000210Z |",
"+-----------+------+------+-----------------------------+",
];
assert_batches_eq!(&expected, &[batch2.clone()]);
batches.push(Arc::new(batch2));
batches
}
/// RecordBatches with knowledge of influx metadata
pub async fn create_batches_with_influxtype_same_columns_different_type() -> Vec<Arc<RecordBatch>> {
// Use the available TestChunk to create chunks and then convert them to raw RecordBatches

View File

@ -26,9 +26,22 @@ enum Variant {
/// Location of a Parquet file within a database's object store.
/// The exact format is an implementation detail and is subject to change.
#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
#[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct ParquetFilePath(Variant);
impl std::fmt::Debug for ParquetFilePath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let path = match self.0 {
Variant::Old { .. } => self.relative_dirs_and_file_name().to_string(),
Variant::New { .. } => self.absolute_dirs_and_file_name().to_string(),
};
f.debug_struct("ParquetFilePath")
.field("inner", &self.0)
.field("resolved_path", &path)
.finish()
}
}
impl ParquetFilePath {
/// Create a location for this chunk's parquet file. Calling this twice on the same `ChunkAddr`
/// will return different `parquet_file::Path`s.
@ -352,6 +365,8 @@ mod tests {
let round_trip =
ParquetFilePath::from_relative_dirs_and_file_name(&dirs_and_file_name).unwrap();
assert_eq!(pfp, round_trip);
assert_eq!(format!("{:?}", pfp), "ParquetFilePath { inner: Old { table_name: \"}*\", partition_key: \"aoeu\", chunk_id: ChunkId(10) }, resolved_path: \"%7D%2A/aoeu/00000000-0000-0000-0000-00000000000a.parquet\" }");
}
#[test]
@ -381,6 +396,8 @@ mod tests {
dirs_and_file_name.to_string(),
"1/2/3/4/00000000-0000-0000-0000-000000000000.parquet".to_string(),
);
assert_eq!(format!("{:?}", pfp), "ParquetFilePath { inner: New { namespace_id: NamespaceId(1), table_id: TableId(2), sequencer_id: SequencerId(3), partition_id: PartitionId(4), object_store_id: 00000000-0000-0000-0000-000000000000 }, resolved_path: \"1/2/3/4/00000000-0000-0000-0000-000000000000.parquet\" }");
}
#[test]

View File

@ -9,6 +9,7 @@ use data_types::{
use data_types2::ParquetFile;
use datafusion::physical_plan::SendableRecordBatchStream;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use observability_deps::tracing::*;
use predicate::Predicate;
use schema::selection::Selection;
use schema::{Schema, TIME_COLUMN_NAME};
@ -231,6 +232,7 @@ impl ParquetChunk {
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream> {
debug!(path=?self.path, "fetching parquet data for filtered read");
Storage::read_filter(
predicate,
selection,

View File

@ -12,7 +12,7 @@ use datafusion_util::AdapterStream;
use futures::{stream, Stream, StreamExt};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::GetResult;
use observability_deps::tracing::debug;
use observability_deps::tracing::*;
use parking_lot::Mutex;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::SerializedFileReader;
@ -300,6 +300,8 @@ impl Storage {
}
}
debug!("Completed parquet download & scan");
Ok(())
}
@ -332,6 +334,7 @@ impl Storage {
// If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result {
warn!(error=%e, "Parquet download & scan failed");
let e = ArrowError::ExternalError(Box::new(e));
if let Err(e) = tx.blocking_send(ArrowResult::Err(e)) {
// if no one is listening, there is no one else to hear our screams

View File

@ -436,10 +436,9 @@ impl Deduplicater {
Some(sort_key) => {
// Technically we only require that the sort order is prefixed by
// the primary key, in order for deduplication to work correctly
assert_eq!(
pk_schema.len(),
sort_key.len(),
"output_sort_key must be same length as primary key"
assert!(
pk_schema.len() <= sort_key.len(),
"output_sort_key must be at least as long as the primary key"
);
assert!(
pk_schema.is_sorted_on_pk(sort_key),

View File

@ -188,9 +188,7 @@ impl Schema {
/// Returns true if the sort_key includes all primary key cols
pub fn is_sorted_on_pk(&self, sort_key: &SortKey) -> bool {
self.primary_key()
.iter()
.all(|col| sort_key.get(*col).is_some())
self.primary_key().iter().all(|col| sort_key.contains(col))
}
/// Provide a reference to the underlying Arrow Schema object

View File

@ -144,6 +144,10 @@ impl SortKey {
builder.build()
}
pub fn to_columns(&self) -> String {
self.columns.keys().join(",")
}
/// Gets the ColumnSort for a given column name
pub fn get(&self, column: &str) -> Option<ColumnSort> {
let (sort_ordinal, _, options) = self.columns.get_full(column)?;
@ -171,6 +175,11 @@ impl SortKey {
None
}
/// Return true if this column appears anywhere in the sort key.
pub fn contains(&self, column: &str) -> bool {
self.columns.contains_key(column)
}
/// Returns an iterator over the columns in this key
pub fn iter(&self) -> Iter<'_, Arc<str>, SortOptions> {
self.columns.iter()