fix: store sort key correctly inthe schema. Update tests to reflect it

pull/24376/head
Nga Tran 2021-07-07 15:55:23 -04:00
parent 8dfc3bb6bc
commit d3c4f8c249
9 changed files with 174 additions and 23 deletions

View File

@ -219,7 +219,7 @@ impl Schema {
} }
/// Set the order of sort columns to the primary key id the schema /// Set the order of sort columns to the primary key id the schema
pub fn set_sort_key (&mut self, sort_key: &SortKey<'_>) { pub fn set_sort_key(&mut self, sort_key: &SortKey<'_>) {
let fields = self.inner.fields(); let fields = self.inner.fields();
// create a new_fields that are the fields with their sort keys set // create a new_fields that are the fields with their sort keys set
@ -228,16 +228,28 @@ impl Schema {
.map(|field| { .map(|field| {
let mut new_field = field.clone(); let mut new_field = field.clone();
if let Some(sort) = sort_key.get(field.name()) { if let Some(sort) = sort_key.get(field.name()) {
set_field_metadata(&mut new_field, None, Some(sort)); let mut meta = std::collections::BTreeMap::new();
// New sort key
meta.insert(COLUMN_SORT_METADATA_KEY.to_string(), sort.to_string());
// Keep other meta data
if let Some(metadata) = field.metadata().to_owned() {
metadata.iter().for_each(|(key, value)| {
if key.ne(&COLUMN_SORT_METADATA_KEY.to_string()) {
meta.insert(key.clone(), value.clone());
}
})
}
new_field.set_metadata(Some(meta))
} }
new_field new_field
} ) })
.collect(); .collect();
let new_meta = self.inner.metadata().clone(); let new_meta = self.inner.metadata().clone();
let new_schema = ArrowSchema::new_with_metadata(new_fields, new_meta); let new_schema = ArrowSchema::new_with_metadata(new_fields, new_meta);
self.inner = Arc::new(new_schema); self.inner = Arc::new(new_schema);
} }
/// Provide a reference to the underlying Arrow Schema object /// Provide a reference to the underlying Arrow Schema object
pub fn inner(&self) -> &ArrowSchemaRef { pub fn inner(&self) -> &ArrowSchemaRef {

View File

@ -171,6 +171,7 @@ impl SchemaBuilder {
arrow_type: ArrowDataType, arrow_type: ArrowDataType,
) -> &mut Self { ) -> &mut Self {
let field = ArrowField::new(column_name, arrow_type, nullable); let field = ArrowField::new(column_name, arrow_type, nullable);
self.fields.push((field, column_type)); self.fields.push((field, column_type));
self self
} }

View File

@ -8,7 +8,7 @@ use datafusion::{
}; };
use datafusion_util::AsExpr; use datafusion_util::AsExpr;
use internal_types::schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; use internal_types::schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
use observability_deps::tracing::debug; use observability_deps::tracing::{debug, trace};
use crate::{ use crate::{
exec::make_stream_split, exec::make_stream_split,
@ -76,8 +76,11 @@ impl ReorgPlanner {
} = self.scan_and_sort_plan(chunks, output_sort.clone())?; } = self.scan_and_sort_plan(chunks, output_sort.clone())?;
let mut schema = provider.iox_schema(); let mut schema = provider.iox_schema();
trace!("Schema before setting sort key: {:#?}", schema);
// Set the sort_key of the schema to the compacted chunk's sort key // Set the sort_key of the schema to the compacted chunk's sort key
schema.set_sort_key(&output_sort); schema.set_sort_key(&output_sort);
trace!("Schema after setting sort key: {:#?}", schema);
let plan = plan_builder.build().context(BuildingPlan)?; let plan = plan_builder.build().context(BuildingPlan)?;

View File

@ -678,7 +678,6 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
chunk: Arc<C>, chunk: Arc<C>,
input: Arc<dyn ExecutionPlan>, input: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> { ) -> Result<Arc<dyn ExecutionPlan>> {
// Todo: check there is sort key and it matches with the given one // Todo: check there is sort key and it matches with the given one
//let sort_key = schema.sort_key(); //let sort_key = schema.sort_key();
if chunk.is_sorted_on_pk() { if chunk.is_sorted_on_pk() {
@ -687,7 +686,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
let schema = chunk.schema(); let schema = chunk.schema();
let sort_exprs = arrow_pk_sort_exprs(schema.primary_key()); let sort_exprs = arrow_pk_sort_exprs(schema.primary_key());
// Create SortExec operator // Create SortExec operator
Ok(Arc::new( Ok(Arc::new(
SortExec::try_new(sort_exprs, input).context(InternalSort)?, SortExec::try_new(sort_exprs, input).context(InternalSort)?,

View File

@ -185,6 +185,40 @@ impl DbSetup for NoData {
/// Two measurements data in a single mutable buffer chunk /// Two measurements data in a single mutable buffer chunk
#[derive(Debug)] #[derive(Debug)]
pub struct TwoMeasurementsMubScenario {}
#[async_trait]
impl DbSetup for TwoMeasurementsMubScenario {
async fn make(&self) -> Vec<DbScenario> {
let lp_lines = vec![
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
make_one_chunk_mub_scenario(&lp_lines.join("\n")).await
}
}
/// Two measurements data in a single read buffer chunk
#[derive(Debug)]
pub struct TwoMeasurementsRubScenario {}
#[async_trait]
impl DbSetup for TwoMeasurementsRubScenario {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
make_one_chunk_rub_scenario(partition_key, &lp_lines.join("\n")).await
}
}
/// Two measurements data in different chunk scenarios
#[derive(Debug)]
pub struct TwoMeasurements {} pub struct TwoMeasurements {}
#[async_trait] #[async_trait]
impl DbSetup for TwoMeasurements { impl DbSetup for TwoMeasurements {
@ -201,6 +235,22 @@ impl DbSetup for TwoMeasurements {
} }
} }
#[derive(Debug)]
pub struct TwoMeasurementsUnsignedTypeMubScenario {}
#[async_trait]
impl DbSetup for TwoMeasurementsUnsignedTypeMubScenario {
async fn make(&self) -> Vec<DbScenario> {
let lp_lines = vec![
"restaurant,town=andover count=40000u 100",
"restaurant,town=reading count=632u 120",
"school,town=reading count=17u 150",
"school,town=andover count=25u 160",
];
make_one_chunk_mub_scenario(&lp_lines.join("\n")).await
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct TwoMeasurementsUnsignedType {} pub struct TwoMeasurementsUnsignedType {}
#[async_trait] #[async_trait]
@ -587,6 +637,45 @@ impl DbSetup for EndToEndTest {
} }
} }
/// This function loads one chunk of lp data into MUB only
///
pub(crate) async fn make_one_chunk_mub_scenario(data: &str) -> Vec<DbScenario> {
// Scenario 1: One open chunk in MUB
let db = make_db().await.db;
write_lp(&db, data).await;
let scenario = DbScenario {
scenario_name: "Data in open chunk of mutable buffer".into(),
db,
};
vec![scenario]
}
/// This function loads one chunk of lp data into RUB only
///
pub(crate) async fn make_one_chunk_rub_scenario(
partition_key: &str,
data: &str,
) -> Vec<DbScenario> {
// Scenario 1: One closed chunk in RUB
let db = make_db().await.db;
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
db.rollover_partition(&table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await
.unwrap();
}
let scenario = DbScenario {
scenario_name: "Data in read buffer".into(),
db,
};
vec![scenario]
}
/// This function loads one chunk of lp data into different scenarios that simulates /// This function loads one chunk of lp data into different scenarios that simulates
/// the data life cycle. /// the data life cycle.
/// ///

View File

@ -1,7 +1,10 @@
//! Tests for the table_names implementation //! Tests for the table_names implementation
use arrow::datatypes::DataType; use arrow::datatypes::DataType;
use internal_types::{schema::builder::SchemaBuilder, selection::Selection}; use internal_types::{
schema::{builder::SchemaBuilder, sort::SortKey, TIME_COLUMN_NAME},
selection::Selection,
};
use query::{QueryChunk, QueryChunkMeta, QueryDatabase}; use query::{QueryChunk, QueryChunkMeta, QueryDatabase};
use super::scenarios::*; use super::scenarios::*;
@ -57,7 +60,7 @@ macro_rules! run_table_schema_test_case {
} }
#[tokio::test] #[tokio::test]
async fn list_schema_cpu_all() { async fn list_schema_cpu_all_mub() {
// we expect columns to come out in lexographic order by name // we expect columns to come out in lexographic order by name
let expected_schema = SchemaBuilder::new() let expected_schema = SchemaBuilder::new()
.tag("region") .tag("region")
@ -66,7 +69,35 @@ async fn list_schema_cpu_all() {
.build() .build()
.unwrap(); .unwrap();
run_table_schema_test_case!(TwoMeasurements {}, Selection::All, "cpu", expected_schema); run_table_schema_test_case!(
TwoMeasurementsMubScenario {},
Selection::All,
"cpu",
expected_schema
);
}
#[tokio::test]
async fn list_schema_cpu_all_rub() {
// we expect columns to come out in lexographic order by name
// The est is o RUb so the schema includes sort key
let mut sort_key = SortKey::with_capacity(2);
sort_key.push("region", Default::default());
sort_key.push(TIME_COLUMN_NAME, Default::default());
let expected_schema = SchemaBuilder::new()
.tag("region")
.timestamp()
.field("user", DataType::Float64)
.build_with_sort_key(&sort_key)
.unwrap();
run_table_schema_test_case!(
TwoMeasurementsRubScenario {},
Selection::All,
"cpu",
expected_schema
);
} }
#[tokio::test] #[tokio::test]
@ -79,7 +110,12 @@ async fn list_schema_disk_all() {
.build() .build()
.unwrap(); .unwrap();
run_table_schema_test_case!(TwoMeasurements {}, Selection::All, "disk", expected_schema); run_table_schema_test_case!(
TwoMeasurementsMubScenario {},
Selection::All,
"disk",
expected_schema
);
} }
#[tokio::test] #[tokio::test]
@ -93,7 +129,12 @@ async fn list_schema_cpu_selection() {
// Pick an order that is not lexographic // Pick an order that is not lexographic
let selection = Selection::Some(&["user", "region"]); let selection = Selection::Some(&["user", "region"]);
run_table_schema_test_case!(TwoMeasurements {}, selection, "cpu", expected_schema); run_table_schema_test_case!(
TwoMeasurementsMubScenario {},
selection,
"cpu",
expected_schema
);
} }
#[tokio::test] #[tokio::test]
@ -108,7 +149,12 @@ async fn list_schema_disk_selection() {
// Pick an order that is not lexographic // Pick an order that is not lexographic
let selection = Selection::Some(&["time", "bytes"]); let selection = Selection::Some(&["time", "bytes"]);
run_table_schema_test_case!(TwoMeasurements {}, selection, "disk", expected_schema); run_table_schema_test_case!(
TwoMeasurementsMubScenario {},
selection,
"disk",
expected_schema
);
} }
#[tokio::test] #[tokio::test]
@ -122,7 +168,7 @@ async fn list_schema_location_all() {
.unwrap(); .unwrap();
run_table_schema_test_case!( run_table_schema_test_case!(
TwoMeasurementsUnsignedType {}, TwoMeasurementsUnsignedTypeMubScenario {},
Selection::All, Selection::All,
"restaurant", "restaurant",
expected_schema expected_schema

View File

@ -613,13 +613,16 @@ impl CatalogChunk {
/// storage. /// storage.
pub fn set_moved(&mut self, chunk: Arc<RBChunk>, schema: Schema) -> Result<()> { pub fn set_moved(&mut self, chunk: Arc<RBChunk>, schema: Schema) -> Result<()> {
match &mut self.stage { match &mut self.stage {
ChunkStage::Frozen {meta, representation, .. } => { ChunkStage::Frozen {
meta,
representation,
..
} => {
// after moved, the chunk is sorted and schema need to get updated // after moved, the chunk is sorted and schema need to get updated
*meta = Arc::new(ChunkMetadata{ *meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary), table_summary: Arc::clone(&meta.table_summary),
schema: Arc::new(schema), schema: Arc::new(schema),
}); });
match &representation { match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => { ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
@ -643,7 +646,7 @@ impl CatalogChunk {
} }
.fail(), .fail(),
} }
}, }
_ => { _ => {
unexpected_state!(self, "setting moved", "Moving", self.stage) unexpected_state!(self, "setting moved", "Moving", self.stage)
} }

View File

@ -454,4 +454,4 @@ impl QueryChunkMeta for DbChunk {
fn schema(&self) -> Arc<Schema> { fn schema(&self) -> Arc<Schema> {
Arc::clone(&self.meta.schema) Arc::clone(&self.meta.schema)
} }
} }

View File

@ -72,8 +72,6 @@ pub fn move_chunk_to_read_buffer(
rb_chunk.upsert_table(batch?) rb_chunk.upsert_table(batch?)
} }
// Can drop and re-acquire as lifecycle action prevents concurrent modification // Can drop and re-acquire as lifecycle action prevents concurrent modification
let mut guard = chunk.write(); let mut guard = chunk.write();