diff --git a/internal_types/src/schema.rs b/internal_types/src/schema.rs index 8c92ddbddc..d914f8f5c5 100644 --- a/internal_types/src/schema.rs +++ b/internal_types/src/schema.rs @@ -219,7 +219,7 @@ impl Schema { } /// Set the order of sort columns to the primary key id the schema - pub fn set_sort_key (&mut self, sort_key: &SortKey<'_>) { + pub fn set_sort_key(&mut self, sort_key: &SortKey<'_>) { let fields = self.inner.fields(); // create a new_fields that are the fields with their sort keys set @@ -228,16 +228,28 @@ impl Schema { .map(|field| { let mut new_field = field.clone(); if let Some(sort) = sort_key.get(field.name()) { - set_field_metadata(&mut new_field, None, Some(sort)); + let mut meta = std::collections::BTreeMap::new(); + // New sort key + meta.insert(COLUMN_SORT_METADATA_KEY.to_string(), sort.to_string()); + // Keep other meta data + if let Some(metadata) = field.metadata().to_owned() { + metadata.iter().for_each(|(key, value)| { + if key.ne(&COLUMN_SORT_METADATA_KEY.to_string()) { + meta.insert(key.clone(), value.clone()); + } + }) + } + + new_field.set_metadata(Some(meta)) } new_field - } ) + }) .collect(); let new_meta = self.inner.metadata().clone(); let new_schema = ArrowSchema::new_with_metadata(new_fields, new_meta); self.inner = Arc::new(new_schema); - } + } /// Provide a reference to the underlying Arrow Schema object pub fn inner(&self) -> &ArrowSchemaRef { diff --git a/internal_types/src/schema/builder.rs b/internal_types/src/schema/builder.rs index 93257188c6..0f6eb81c38 100644 --- a/internal_types/src/schema/builder.rs +++ b/internal_types/src/schema/builder.rs @@ -171,6 +171,7 @@ impl SchemaBuilder { arrow_type: ArrowDataType, ) -> &mut Self { let field = ArrowField::new(column_name, arrow_type, nullable); + self.fields.push((field, column_type)); self } diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index 022deb46fa..aecdb770ba 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -8,7 +8,7 @@ use datafusion::{ }; use datafusion_util::AsExpr; use internal_types::schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; -use observability_deps::tracing::debug; +use observability_deps::tracing::{debug, trace}; use crate::{ exec::make_stream_split, @@ -76,8 +76,11 @@ impl ReorgPlanner { } = self.scan_and_sort_plan(chunks, output_sort.clone())?; let mut schema = provider.iox_schema(); + + trace!("Schema before setting sort key: {:#?}", schema); // Set the sort_key of the schema to the compacted chunk's sort key - schema.set_sort_key(&output_sort); + schema.set_sort_key(&output_sort); + trace!("Schema after setting sort key: {:#?}", schema); let plan = plan_builder.build().context(BuildingPlan)?; diff --git a/query/src/provider.rs b/query/src/provider.rs index d341050329..d25e90776a 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -678,7 +678,6 @@ impl Deduplicater { chunk: Arc, input: Arc, ) -> Result> { - // Todo: check there is sort key and it matches with the given one //let sort_key = schema.sort_key(); if chunk.is_sorted_on_pk() { @@ -687,7 +686,7 @@ impl Deduplicater { let schema = chunk.schema(); let sort_exprs = arrow_pk_sort_exprs(schema.primary_key()); - + // Create SortExec operator Ok(Arc::new( SortExec::try_new(sort_exprs, input).context(InternalSort)?, diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 0429172b2e..7c31dfe445 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -185,6 +185,40 @@ impl DbSetup for NoData { /// Two measurements data in a single mutable buffer chunk #[derive(Debug)] +pub struct TwoMeasurementsMubScenario {} +#[async_trait] +impl DbSetup for TwoMeasurementsMubScenario { + async fn make(&self) -> Vec { + let lp_lines = vec![ + "cpu,region=west user=23.2 100", + "cpu,region=west user=21.0 150", + "disk,region=east bytes=99i 200", + ]; + + make_one_chunk_mub_scenario(&lp_lines.join("\n")).await + } +} + +/// Two measurements data in a single read buffer chunk +#[derive(Debug)] +pub struct TwoMeasurementsRubScenario {} +#[async_trait] +impl DbSetup for TwoMeasurementsRubScenario { + async fn make(&self) -> Vec { + let partition_key = "1970-01-01T00"; + + let lp_lines = vec![ + "cpu,region=west user=23.2 100", + "cpu,region=west user=21.0 150", + "disk,region=east bytes=99i 200", + ]; + + make_one_chunk_rub_scenario(partition_key, &lp_lines.join("\n")).await + } +} + +/// Two measurements data in different chunk scenarios +#[derive(Debug)] pub struct TwoMeasurements {} #[async_trait] impl DbSetup for TwoMeasurements { @@ -201,6 +235,22 @@ impl DbSetup for TwoMeasurements { } } +#[derive(Debug)] +pub struct TwoMeasurementsUnsignedTypeMubScenario {} +#[async_trait] +impl DbSetup for TwoMeasurementsUnsignedTypeMubScenario { + async fn make(&self) -> Vec { + let lp_lines = vec![ + "restaurant,town=andover count=40000u 100", + "restaurant,town=reading count=632u 120", + "school,town=reading count=17u 150", + "school,town=andover count=25u 160", + ]; + + make_one_chunk_mub_scenario(&lp_lines.join("\n")).await + } +} + #[derive(Debug)] pub struct TwoMeasurementsUnsignedType {} #[async_trait] @@ -587,6 +637,45 @@ impl DbSetup for EndToEndTest { } } +/// This function loads one chunk of lp data into MUB only +/// +pub(crate) async fn make_one_chunk_mub_scenario(data: &str) -> Vec { + // Scenario 1: One open chunk in MUB + let db = make_db().await.db; + write_lp(&db, data).await; + let scenario = DbScenario { + scenario_name: "Data in open chunk of mutable buffer".into(), + db, + }; + + vec![scenario] +} + +/// This function loads one chunk of lp data into RUB only +/// +pub(crate) async fn make_one_chunk_rub_scenario( + partition_key: &str, + data: &str, +) -> Vec { + // Scenario 1: One closed chunk in RUB + let db = make_db().await.db; + let table_names = write_lp(&db, data).await; + for table_name in &table_names { + db.rollover_partition(&table_name, partition_key) + .await + .unwrap(); + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) + .await + .unwrap(); + } + let scenario = DbScenario { + scenario_name: "Data in read buffer".into(), + db, + }; + + vec![scenario] +} + /// This function loads one chunk of lp data into different scenarios that simulates /// the data life cycle. /// diff --git a/query_tests/src/table_schema.rs b/query_tests/src/table_schema.rs index a6d95c3a80..b23aa092df 100644 --- a/query_tests/src/table_schema.rs +++ b/query_tests/src/table_schema.rs @@ -1,7 +1,10 @@ //! Tests for the table_names implementation use arrow::datatypes::DataType; -use internal_types::{schema::builder::SchemaBuilder, selection::Selection}; +use internal_types::{ + schema::{builder::SchemaBuilder, sort::SortKey, TIME_COLUMN_NAME}, + selection::Selection, +}; use query::{QueryChunk, QueryChunkMeta, QueryDatabase}; use super::scenarios::*; @@ -57,7 +60,7 @@ macro_rules! run_table_schema_test_case { } #[tokio::test] -async fn list_schema_cpu_all() { +async fn list_schema_cpu_all_mub() { // we expect columns to come out in lexographic order by name let expected_schema = SchemaBuilder::new() .tag("region") @@ -66,7 +69,35 @@ async fn list_schema_cpu_all() { .build() .unwrap(); - run_table_schema_test_case!(TwoMeasurements {}, Selection::All, "cpu", expected_schema); + run_table_schema_test_case!( + TwoMeasurementsMubScenario {}, + Selection::All, + "cpu", + expected_schema + ); +} + +#[tokio::test] +async fn list_schema_cpu_all_rub() { + // we expect columns to come out in lexographic order by name + // The est is o RUb so the schema includes sort key + let mut sort_key = SortKey::with_capacity(2); + sort_key.push("region", Default::default()); + sort_key.push(TIME_COLUMN_NAME, Default::default()); + + let expected_schema = SchemaBuilder::new() + .tag("region") + .timestamp() + .field("user", DataType::Float64) + .build_with_sort_key(&sort_key) + .unwrap(); + + run_table_schema_test_case!( + TwoMeasurementsRubScenario {}, + Selection::All, + "cpu", + expected_schema + ); } #[tokio::test] @@ -79,7 +110,12 @@ async fn list_schema_disk_all() { .build() .unwrap(); - run_table_schema_test_case!(TwoMeasurements {}, Selection::All, "disk", expected_schema); + run_table_schema_test_case!( + TwoMeasurementsMubScenario {}, + Selection::All, + "disk", + expected_schema + ); } #[tokio::test] @@ -93,7 +129,12 @@ async fn list_schema_cpu_selection() { // Pick an order that is not lexographic let selection = Selection::Some(&["user", "region"]); - run_table_schema_test_case!(TwoMeasurements {}, selection, "cpu", expected_schema); + run_table_schema_test_case!( + TwoMeasurementsMubScenario {}, + selection, + "cpu", + expected_schema + ); } #[tokio::test] @@ -108,7 +149,12 @@ async fn list_schema_disk_selection() { // Pick an order that is not lexographic let selection = Selection::Some(&["time", "bytes"]); - run_table_schema_test_case!(TwoMeasurements {}, selection, "disk", expected_schema); + run_table_schema_test_case!( + TwoMeasurementsMubScenario {}, + selection, + "disk", + expected_schema + ); } #[tokio::test] @@ -122,7 +168,7 @@ async fn list_schema_location_all() { .unwrap(); run_table_schema_test_case!( - TwoMeasurementsUnsignedType {}, + TwoMeasurementsUnsignedTypeMubScenario {}, Selection::All, "restaurant", expected_schema diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index a750cb9d6b..5245405903 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -613,13 +613,16 @@ impl CatalogChunk { /// storage. pub fn set_moved(&mut self, chunk: Arc, schema: Schema) -> Result<()> { match &mut self.stage { - ChunkStage::Frozen {meta, representation, .. } => { + ChunkStage::Frozen { + meta, + representation, + .. + } => { // after moved, the chunk is sorted and schema need to get updated - *meta = Arc::new(ChunkMetadata{ + *meta = Arc::new(ChunkMetadata { table_summary: Arc::clone(&meta.table_summary), schema: Arc::new(schema), - }); - + }); match &representation { ChunkStageFrozenRepr::MutableBufferSnapshot(_) => { @@ -643,7 +646,7 @@ impl CatalogChunk { } .fail(), } - }, + } _ => { unexpected_state!(self, "setting moved", "Moving", self.stage) } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index b1c4bc40e3..88ebdf0376 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -454,4 +454,4 @@ impl QueryChunkMeta for DbChunk { fn schema(&self) -> Arc { Arc::clone(&self.meta.schema) } -} \ No newline at end of file +} diff --git a/server/src/db/lifecycle/move_chunk.rs b/server/src/db/lifecycle/move_chunk.rs index 048ae68d01..7c8ed44749 100644 --- a/server/src/db/lifecycle/move_chunk.rs +++ b/server/src/db/lifecycle/move_chunk.rs @@ -72,8 +72,6 @@ pub fn move_chunk_to_read_buffer( rb_chunk.upsert_table(batch?) } - - // Can drop and re-acquire as lifecycle action prevents concurrent modification let mut guard = chunk.write();