Merge pull request #1902 from influxdata/ntran/avoid_sort_in_scan

feat: store sort_key in the chunk schema after the chunk is sorted
pull/24376/head
kodiakhq[bot] 2021-07-08 20:29:38 +00:00 committed by GitHub
commit 9961cf8008
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 441 additions and 36 deletions

View File

@ -218,6 +218,39 @@ impl Schema {
Ok(record)
}
/// Set the order of sort columns to the specified `sort_key`
pub fn set_sort_key(&mut self, sort_key: &SortKey<'_>) {
let fields = self.inner.fields();
// create a new_fields that are the fields with their sort keys set
let new_fields = fields
.iter()
.map(|field| {
let mut new_field = field.clone();
let mut meta = std::collections::BTreeMap::new();
if let Some(sort) = sort_key.get(field.name()) {
// New sort key
meta.insert(COLUMN_SORT_METADATA_KEY.to_string(), sort.to_string());
}
// Keep other meta data
if let Some(metadata) = field.metadata() {
for (key, value) in metadata {
if key.ne(&COLUMN_SORT_METADATA_KEY.to_string()) {
meta.insert(key.clone(), value.clone());
}
}
}
new_field.set_metadata(Some(meta));
new_field
})
.collect();
let new_meta = self.inner.metadata().clone();
let new_schema = ArrowSchema::new_with_metadata(new_fields, new_meta);
self.inner = Arc::new(new_schema);
}
/// Provide a reference to the underlying Arrow Schema object
pub fn inner(&self) -> &ArrowSchemaRef {
&self.inner

View File

@ -171,6 +171,7 @@ impl SchemaBuilder {
arrow_type: ArrowDataType,
) -> &mut Self {
let field = ArrowField::new(column_name, arrow_type, nullable);
self.fields.push((field, column_type));
self
}

View File

@ -83,7 +83,7 @@ impl std::fmt::Display for ColumnSort {
}
}
#[derive(Debug, Default, Eq, PartialEq)]
#[derive(Debug, Default, Eq, PartialEq, Clone)]
pub struct SortKey<'a> {
columns: IndexMap<&'a str, SortOptions>,
}

View File

@ -8,7 +8,7 @@ use datafusion::{
};
use datafusion_util::AsExpr;
use internal_types::schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
use observability_deps::tracing::debug;
use observability_deps::tracing::{debug, trace};
use crate::{
exec::make_stream_split,
@ -73,10 +73,14 @@ impl ReorgPlanner {
let ScanPlan {
plan_builder,
provider,
} = self.scan_and_sort_plan(chunks, output_sort)?;
} = self.scan_and_sort_plan(chunks, output_sort.clone())?;
// TODO: Set sort key on schema
let schema = provider.iox_schema();
let mut schema = provider.iox_schema();
// Set the sort_key of the schema to the compacted chunk's sort key
trace!(input_schema=?schema, "Setting sort key on schema");
schema.set_sort_key(&output_sort);
trace!(output_schema=?schema, "Setting sort key on schema");
let plan = plan_builder.build().context(BuildingPlan)?;

View File

@ -682,6 +682,8 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
chunk: Arc<C>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Todo: check there is sort key and it matches with the given one
//let sort_key = schema.sort_key();
if chunk.is_sorted_on_pk() {
return Ok(input);
}

View File

@ -185,6 +185,40 @@ impl DbSetup for NoData {
/// Two measurements data in a single mutable buffer chunk
#[derive(Debug)]
pub struct TwoMeasurementsMubScenario {}
#[async_trait]
impl DbSetup for TwoMeasurementsMubScenario {
async fn make(&self) -> Vec<DbScenario> {
let lp_lines = vec![
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
make_one_chunk_mub_scenario(&lp_lines.join("\n")).await
}
}
/// Two measurements data in a single read buffer chunk
#[derive(Debug)]
pub struct TwoMeasurementsRubScenario {}
#[async_trait]
impl DbSetup for TwoMeasurementsRubScenario {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
make_one_chunk_rub_scenario(partition_key, &lp_lines.join("\n")).await
}
}
/// Two measurements data in different chunk scenarios
#[derive(Debug)]
pub struct TwoMeasurements {}
#[async_trait]
impl DbSetup for TwoMeasurements {
@ -201,6 +235,22 @@ impl DbSetup for TwoMeasurements {
}
}
#[derive(Debug)]
pub struct TwoMeasurementsUnsignedTypeMubScenario {}
#[async_trait]
impl DbSetup for TwoMeasurementsUnsignedTypeMubScenario {
async fn make(&self) -> Vec<DbScenario> {
let lp_lines = vec![
"restaurant,town=andover count=40000u 100",
"restaurant,town=reading count=632u 120",
"school,town=reading count=17u 150",
"school,town=andover count=25u 160",
];
make_one_chunk_mub_scenario(&lp_lines.join("\n")).await
}
}
#[derive(Debug)]
pub struct TwoMeasurementsUnsignedType {}
#[async_trait]
@ -587,6 +637,45 @@ impl DbSetup for EndToEndTest {
}
}
/// This function loads one chunk of lp data into MUB only
///
pub(crate) async fn make_one_chunk_mub_scenario(data: &str) -> Vec<DbScenario> {
// Scenario 1: One open chunk in MUB
let db = make_db().await.db;
write_lp(&db, data).await;
let scenario = DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
};
vec![scenario]
}
/// This function loads one chunk of lp data into RUB only
///
pub(crate) async fn make_one_chunk_rub_scenario(
partition_key: &str,
data: &str,
) -> Vec<DbScenario> {
// Scenario 1: One closed chunk in RUB
let db = make_db().await.db;
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await
.unwrap();
}
let scenario = DbScenario {
scenario_name: "Data in read buffer".into(),
db,
};
vec![scenario]
}
/// This function loads one chunk of lp data into different scenarios that simulates
/// the data life cycle.
///

View File

@ -1,7 +1,10 @@
//! Tests for the table_names implementation
use arrow::datatypes::DataType;
use internal_types::{schema::builder::SchemaBuilder, selection::Selection};
use internal_types::{
schema::{builder::SchemaBuilder, sort::SortKey, TIME_COLUMN_NAME},
selection::Selection,
};
use query::{QueryChunk, QueryChunkMeta, QueryDatabase};
use super::scenarios::*;
@ -57,7 +60,7 @@ macro_rules! run_table_schema_test_case {
}
#[tokio::test]
async fn list_schema_cpu_all() {
async fn list_schema_cpu_all_mub() {
// we expect columns to come out in lexographic order by name
let expected_schema = SchemaBuilder::new()
.tag("region")
@ -66,7 +69,59 @@ async fn list_schema_cpu_all() {
.build()
.unwrap();
run_table_schema_test_case!(TwoMeasurements {}, Selection::All, "cpu", expected_schema);
run_table_schema_test_case!(
TwoMeasurementsMubScenario {},
Selection::All,
"cpu",
expected_schema
);
}
#[tokio::test]
async fn list_schema_cpu_all_rub() {
// we expect columns to come out in lexographic order by name
// The schema of RUB includes sort key
let mut sort_key = SortKey::with_capacity(2);
sort_key.push("region", Default::default());
sort_key.push(TIME_COLUMN_NAME, Default::default());
let expected_schema = SchemaBuilder::new()
.tag("region")
.timestamp()
.field("user", DataType::Float64)
.build_with_sort_key(&sort_key)
.unwrap();
run_table_schema_test_case!(
TwoMeasurementsRubScenario {},
Selection::All,
"cpu",
expected_schema
);
}
#[tokio::test]
async fn list_schema_cpu_all_rub_set_sort_key() {
// The schema of RUB includes sort key
let mut sort_key = SortKey::with_capacity(2);
sort_key.push("region", Default::default());
sort_key.push(TIME_COLUMN_NAME, Default::default());
let expected_schema = SchemaBuilder::new()
.tag("region")
.timestamp()
.field("user", DataType::Float64)
.build_with_sort_key(&sort_key)
.unwrap();
run_table_schema_test_case!(
TwoMeasurementsRubScenario {},
Selection::All,
"cpu",
expected_schema
);
// Now set
}
#[tokio::test]
@ -79,7 +134,12 @@ async fn list_schema_disk_all() {
.build()
.unwrap();
run_table_schema_test_case!(TwoMeasurements {}, Selection::All, "disk", expected_schema);
run_table_schema_test_case!(
TwoMeasurementsMubScenario {},
Selection::All,
"disk",
expected_schema
);
}
#[tokio::test]
@ -93,7 +153,12 @@ async fn list_schema_cpu_selection() {
// Pick an order that is not lexographic
let selection = Selection::Some(&["user", "region"]);
run_table_schema_test_case!(TwoMeasurements {}, selection, "cpu", expected_schema);
run_table_schema_test_case!(
TwoMeasurementsMubScenario {},
selection,
"cpu",
expected_schema
);
}
#[tokio::test]
@ -108,7 +173,12 @@ async fn list_schema_disk_selection() {
// Pick an order that is not lexographic
let selection = Selection::Some(&["time", "bytes"]);
run_table_schema_test_case!(TwoMeasurements {}, selection, "disk", expected_schema);
run_table_schema_test_case!(
TwoMeasurementsMubScenario {},
selection,
"disk",
expected_schema
);
}
#[tokio::test]
@ -122,9 +192,200 @@ async fn list_schema_location_all() {
.unwrap();
run_table_schema_test_case!(
TwoMeasurementsUnsignedType {},
TwoMeasurementsUnsignedTypeMubScenario {},
Selection::All,
"restaurant",
expected_schema
);
}
#[tokio::test]
async fn test_set_sort_key_valid_same_order() {
// Build the expected schema with sort key
let mut sort_key = SortKey::with_capacity(3);
sort_key.push("tag1", Default::default());
sort_key.push("time", Default::default());
sort_key.push("tag2", Default::default());
let expected_schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build_with_sort_key(&sort_key)
.unwrap();
// The same schema without sort key
let mut schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build()
.unwrap();
schema.set_sort_key(&sort_key);
assert_eq!(
expected_schema, schema,
"Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n",
expected_schema, schema
);
}
#[tokio::test]
async fn test_set_sort_key_valid_different_order() {
// Build the expected schema with sort key "time, tag2, tag1"
let mut sort_key = SortKey::with_capacity(3);
sort_key.push("time", Default::default());
sort_key.push("tag2", Default::default());
sort_key.push("tag1", Default::default());
let expected_schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build_with_sort_key(&sort_key)
.unwrap();
// The same schema without sort key
let mut schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build()
.unwrap();
schema.set_sort_key(&sort_key);
assert_eq!(
expected_schema, schema,
"Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n",
expected_schema, schema
);
}
#[tokio::test]
async fn test_set_sort_key_valid_subset() {
// Build the expected schema with sort key "time, tag1"
let mut sort_key = SortKey::with_capacity(2);
sort_key.push("time", Default::default());
sort_key.push("tag1", Default::default());
let expected_schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build_with_sort_key(&sort_key)
.unwrap();
// The same schema without sort key
let mut schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build()
.unwrap();
// set sort key for it
schema.set_sort_key(&sort_key);
assert_eq!(
expected_schema, schema,
"Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n",
expected_schema, schema
);
}
#[tokio::test]
async fn test_set_sort_key_valid_subset_of_fully_set() {
// Build sort key "tag1, time, tag2"
let mut sort_key = SortKey::with_capacity(3);
sort_key.push("tag1", Default::default());
sort_key.push("time", Default::default());
sort_key.push("tag2", Default::default());
// The schema with sort key
let mut schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build_with_sort_key(&sort_key)
.unwrap();
// reset sort key to "tag2, time"
let mut sort_key = SortKey::with_capacity(2);
sort_key.push("tag2", Default::default());
sort_key.push("time", Default::default());
schema.set_sort_key(&sort_key);
// Expected schema with "tag2, time" as sort key
let expected_schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build_with_sort_key(&sort_key)
.unwrap();
assert_eq!(
expected_schema, schema,
"Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n",
expected_schema, schema
);
}
#[tokio::test]
async fn test_set_sort_key_invalid_not_exist() {
// Build the expected schema with sort key "time"
let mut sort_key = SortKey::with_capacity(1);
sort_key.push("time", Default::default());
let expected_schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build_with_sort_key(&sort_key)
.unwrap();
// The same schema without sort key
let mut schema = SchemaBuilder::new()
.tag("tag1")
.timestamp()
.tag("tag2")
.field("field_int", DataType::Int64)
.field("field_float", DataType::Float64)
.build()
.unwrap();
// Nuild sort key that include valid "time" and invalid "no_tag"
let mut sort_key = SortKey::with_capacity(2);
sort_key.push("time", Default::default());
// invalid column
sort_key.push("not_tag", Default::default());
// The invalid key will be ignored in this function
schema.set_sort_key(&sort_key);
assert_eq!(
expected_schema, schema,
"Schema mismatch \nExpected:\n{:#?}\nActual:\n{:#?}\n",
expected_schema, schema
);
}

View File

@ -609,31 +609,42 @@ impl CatalogChunk {
/// Set the chunk in the Moved state, setting the underlying
/// storage handle to db, and discarding the underlying mutable buffer
/// storage.
pub fn set_moved(&mut self, chunk: Arc<RBChunk>) -> Result<()> {
pub fn set_moved(&mut self, chunk: Arc<RBChunk>, schema: Schema) -> Result<()> {
match &mut self.stage {
ChunkStage::Frozen { representation, .. } => match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
self.metrics
.state
.inc_with_labels(&[KeyValue::new("state", "moved")]);
ChunkStage::Frozen {
meta,
representation,
..
} => {
// after moved, the chunk is sorted and its schema needs to get updated
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::new(schema),
});
self.metrics.immutable_chunk_size.observe_with_labels(
chunk.size() as f64,
&[KeyValue::new("state", "moved")],
);
match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
self.metrics
.state
.inc_with_labels(&[KeyValue::new("state", "moved")]);
*representation = ChunkStageFrozenRepr::ReadBuffer(chunk);
self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?;
Ok(())
self.metrics.immutable_chunk_size.observe_with_labels(
chunk.size() as f64,
&[KeyValue::new("state", "moved")],
);
*representation = ChunkStageFrozenRepr::ReadBuffer(chunk);
self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?;
Ok(())
}
ChunkStageFrozenRepr::ReadBuffer(_) => InternalChunkState {
chunk: self.addr.clone(),
operation: "setting moved",
expected: "Frozen with MutableBufferSnapshot",
actual: "Frozen with ReadBuffer",
}
.fail(),
}
ChunkStageFrozenRepr::ReadBuffer(_) => InternalChunkState {
chunk: self.addr.clone(),
operation: "setting moved",
expected: "Frozen with MutableBufferSnapshot",
actual: "Frozen with ReadBuffer",
}
.fail(),
},
}
_ => {
unexpected_state!(self, "setting moved", "Moving", self.stage)
}

View File

@ -432,7 +432,11 @@ impl QueryChunk for DbChunk {
}
}
// TODOs: return the right value. For now the chunk is assumed to be not sorted
/// Returns true if the chunk is sorted on its pk
/// Since data is compacted prior being moved to RUBs, data in RUBs and OBs
/// should be sorted on their PK as the results of compacting.
/// However, since we current sorted data based on their cardinality (see compute_sort_key),
/// 2 different chunks may be sorted on different order of key columns.
fn is_sorted_on_pk(&self) -> bool {
match &self.state {
State::MutableBuffer { .. } => false,

View File

@ -51,7 +51,7 @@ pub fn move_chunk_to_read_buffer(
let key = compute_sort_key(query_chunks.iter().map(|x| x.summary()));
// Cannot move query_chunks as the sort key borrows the column names
let (_schema, plan) =
let (schema, plan) =
ReorgPlanner::new().compact_plan(query_chunks.iter().map(Arc::clone), key)?;
let physical_plan = ctx.prepare_plan(&plan)?;
@ -63,7 +63,7 @@ pub fn move_chunk_to_read_buffer(
// update the catalog to say we are done processing
guard
.set_moved(Arc::new(rb_chunk))
.set_moved(Arc::new(rb_chunk), schema)
.expect("failed to move chunk");
debug!(chunk=%addr, "chunk marked MOVED. loading complete");