diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 88a60e82c0..7e9736aeac 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -356,7 +356,7 @@ impl CompactPlanBuilder { ReorgPlanner::new(ctx.child_ctx("ReorgPlanner")) .compact_plan( Arc::from(partition.table.name.clone()), - Arc::clone(&merged_schema), + &merged_schema, query_chunks, sort_key.clone(), ) @@ -384,7 +384,7 @@ impl CompactPlanBuilder { ReorgPlanner::new(ctx.child_ctx("ReorgPlanner")) .compact_plan( Arc::from(partition.table.name.clone()), - Arc::clone(&merged_schema), + &merged_schema, query_chunks, sort_key.clone(), ) @@ -394,7 +394,7 @@ impl CompactPlanBuilder { ReorgPlanner::new(ctx.child_ctx("ReorgPlanner")) .split_plan( Arc::from(partition.table.name.clone()), - Arc::clone(&merged_schema), + &merged_schema, query_chunks, sort_key.clone(), split_times, @@ -537,7 +537,7 @@ impl CompactPlanBuilder { let plan = ReorgPlanner::new(ctx.child_ctx("ReorgPlanner")) .compact_plan( Arc::from(partition.table.name.clone()), - Arc::clone(&merged_schema), + &merged_schema, query_chunks, sort_key.clone(), ) @@ -769,7 +769,7 @@ fn to_queryable_parquet_chunk( .map(|sk| sk.filter_to(&pk, file.partition_id().get())); let file = Arc::new(ParquetFile::from(file)); - let parquet_chunk = ParquetChunk::new(Arc::clone(&file), Arc::new(schema), store); + let parquet_chunk = ParquetChunk::new(Arc::clone(&file), schema, store); trace!( parquet_file_id=?file.id, diff --git a/compactor/src/query.rs b/compactor/src/query.rs index 27ce7cffb3..c34c4abcab 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -60,7 +60,7 @@ impl QueryableParquetChunk { let delete_predicates = tombstones_to_delete_predicates(deletes); let summary = Arc::new(create_basic_summary( data.rows() as u64, - &data.schema(), + data.schema(), data.timestamp_min_max(), )); Self { @@ -79,10 +79,10 @@ impl QueryableParquetChunk { } /// Merge schema of the given chunks - pub fn merge_schemas(chunks: &[Arc]) -> Arc { + pub fn merge_schemas(chunks: &[Arc]) -> Schema { let mut merger = SchemaMerger::new(); for chunk in chunks { - merger = merger.merge(&chunk.schema()).expect("schemas compatible"); + merger = merger.merge(chunk.schema()).expect("schemas compatible"); } merger.build() } @@ -113,7 +113,7 @@ impl QueryChunkMeta for QueryableParquetChunk { Arc::clone(&self.summary) } - fn schema(&self) -> Arc { + fn schema(&self) -> &Schema { self.data.schema() } @@ -259,7 +259,7 @@ mod tests { let parquet_chunk = Arc::new(ParquetChunk::new( Arc::clone(&parquet_file), - Arc::new(table.schema().await), + table.schema().await, ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")), )); diff --git a/compactor/src/utils.rs b/compactor/src/utils.rs index 79aca26dbe..8cba34b932 100644 --- a/compactor/src/utils.rs +++ b/compactor/src/utils.rs @@ -176,7 +176,7 @@ impl ParquetFileWithTombstone { .as_ref() .map(|sk| sk.filter_to(&pk, self.partition_id.get())); - let parquet_chunk = ParquetChunk::new(Arc::clone(&self.data), Arc::new(schema), store); + let parquet_chunk = ParquetChunk::new(Arc::clone(&self.data), schema, store); trace!( parquet_file_id=?self.id, diff --git a/datafusion_util/src/lib.rs b/datafusion_util/src/lib.rs index f89bb4bcc5..3961a93864 100644 --- a/datafusion_util/src/lib.rs +++ b/datafusion_util/src/lib.rs @@ -227,8 +227,8 @@ where fn poll_next( mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + cx: &mut Context<'_>, + ) -> Poll> { self.inner.poll_next_unpin(cx) } } @@ -243,13 +243,13 @@ where } /// Create a SendableRecordBatchStream a RecordBatch -pub fn stream_from_batch(schema: Arc, batch: RecordBatch) -> SendableRecordBatchStream { +pub fn stream_from_batch(schema: SchemaRef, batch: RecordBatch) -> SendableRecordBatchStream { stream_from_batches(schema, vec![Arc::new(batch)]) } /// Create a SendableRecordBatchStream from Vec of RecordBatches with the same schema pub fn stream_from_batches( - schema: Arc, + schema: SchemaRef, batches: Vec>, ) -> SendableRecordBatchStream { if batches.is_empty() { diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 0813cdb393..62b10bffea 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -109,7 +109,7 @@ pub(crate) async fn compact_persisting_batch( } None => { let sort_key = compute_sort_key( - batch.schema().as_ref(), + batch.schema(), batch.record_batches().iter().map(|sb| sb.as_ref()), ); // Use the sort key computed from the cardinality as the sort key for this parquet @@ -141,7 +141,7 @@ pub(crate) async fn compact( .compact_plan( table_name.into(), data.schema(), - [data as Arc], + [Arc::clone(&data) as Arc], sort_key, ) .context(LogicalPlanSnafu {})?; @@ -525,7 +525,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -566,7 +566,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -612,7 +612,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -658,7 +658,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact @@ -708,7 +708,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact diff --git a/ingester/src/query_adaptor.rs b/ingester/src/query_adaptor.rs index a70eda8ad9..babb281aab 100644 --- a/ingester/src/query_adaptor.rs +++ b/ingester/src/query_adaptor.rs @@ -60,7 +60,7 @@ pub(crate) struct QueryAdaptor { id: ChunkId, /// An interned schema for all [`RecordBatch`] in data. - schema: OnceCell>, + schema: Schema, /// An interned table summary. summary: OnceCell>, @@ -80,13 +80,14 @@ impl QueryAdaptor { // partitions - if there is a QueryAdaptor, it contains data. assert!(data.iter().map(|b| b.num_rows()).sum::() > 0); + let schema = merge_record_batch_schemas(&data); Self { + schema, data, partition_id, // To return a value for debugging and make it consistent with ChunkId created in Compactor, // use Uuid for this. Draw this UUID during chunk generation so that it is stable during the whole query process. id: ChunkId::new(), - schema: OnceCell::default(), summary: OnceCell::default(), } } @@ -137,17 +138,14 @@ impl QueryChunkMeta for QueryAdaptor { Arc::new(create_basic_summary( self.data.iter().map(|b| b.num_rows()).sum::() as u64, - &self.schema(), + self.schema(), ts_min_max, )) })) } - fn schema(&self) -> Arc { - Arc::clone( - self.schema - .get_or_init(|| merge_record_batch_schemas(&self.data)), - ) + fn schema(&self) -> &Schema { + &self.schema } fn partition_sort_key(&self) -> Option<&SortKey> { diff --git a/ingester2/src/persist/compact.rs b/ingester2/src/persist/compact.rs index 2910aa2aad..84fabfcd55 100644 --- a/ingester2/src/persist/compact.rs +++ b/ingester2/src/persist/compact.rs @@ -68,7 +68,7 @@ pub(super) async fn compact_persisting_batch( } None => { let sort_key = compute_sort_key( - batch.schema().as_ref(), + batch.schema(), batch.record_batches().iter().map(|sb| sb.as_ref()), ); // Use the sort key computed from the cardinality as the sort key for this parquet @@ -85,7 +85,7 @@ pub(super) async fn compact_persisting_batch( .compact_plan( table_name.into(), batch.schema(), - [batch as Arc], + [Arc::clone(&batch) as Arc], data_sort_key.clone(), ) .unwrap(); @@ -460,7 +460,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -501,7 +501,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -547,7 +547,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"])); // compact @@ -594,7 +594,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact @@ -645,7 +645,7 @@ mod tests { assert_eq!(expected_pk, pk); let sort_key = - compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref())); + compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref())); assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"])); // compact diff --git a/ingester2/src/persist/handle.rs b/ingester2/src/persist/handle.rs index 0d3a59489b..1c9a5a1271 100644 --- a/ingester2/src/persist/handle.rs +++ b/ingester2/src/persist/handle.rs @@ -339,7 +339,7 @@ impl PersistQueue for PersistHandle { }; // Build the persist task request. - let schema = data.schema(); + let schema = data.schema().clone(); let (r, notify) = PersistRequest::new(Arc::clone(&partition), data, permit, enqueued_at); match sort_key { diff --git a/ingester2/src/query_adaptor.rs b/ingester2/src/query_adaptor.rs index 6188bef3b9..38e1645391 100644 --- a/ingester2/src/query_adaptor.rs +++ b/ingester2/src/query_adaptor.rs @@ -39,7 +39,7 @@ pub struct QueryAdaptor { id: ChunkId, /// An interned schema for all [`RecordBatch`] in data. - schema: OnceCell>, + schema: Schema, /// An interned table summary. summary: OnceCell>, @@ -59,13 +59,14 @@ impl QueryAdaptor { // partitions - if there is a QueryAdaptor, it contains data. assert!(data.iter().map(|b| b.num_rows()).sum::() > 0); + let schema = merge_record_batch_schemas(&data); Self { data, partition_id, // To return a value for debugging and make it consistent with ChunkId created in Compactor, // use Uuid for this. Draw this UUID during chunk generation so that it is stable during the whole query process. id: ChunkId::new(), - schema: OnceCell::default(), + schema, summary: OnceCell::default(), } } @@ -116,17 +117,14 @@ impl QueryChunkMeta for QueryAdaptor { Arc::new(create_basic_summary( self.data.iter().map(|b| b.num_rows()).sum::() as u64, - &self.schema(), + self.schema(), ts_min_max, )) })) } - fn schema(&self) -> Arc { - Arc::clone( - self.schema - .get_or_init(|| merge_record_batch_schemas(&self.data)), - ) + fn schema(&self) -> &Schema { + &self.schema } fn partition_sort_key(&self) -> Option<&SortKey> { diff --git a/iox_arrow_flight/src/client.rs b/iox_arrow_flight/src/client.rs index 51a9a7ad67..03ff39af0a 100644 --- a/iox_arrow_flight/src/client.rs +++ b/iox_arrow_flight/src/client.rs @@ -1,3 +1,4 @@ +use arrow::datatypes::SchemaRef; /// Prototype "Flight Client" that handles underlying details of the flight protocol at a higher level /// Based on the "low level client" from IOx client: @@ -412,7 +413,7 @@ impl futures::Stream for FlightDataStream { /// streaming flight response. #[derive(Debug)] struct FlightStreamState { - schema: Arc, + schema: SchemaRef, dictionaries_by_field: HashMap, } @@ -431,7 +432,7 @@ impl DecodedFlightData { } } - pub fn new_schema(inner: FlightData, schema: Arc) -> Self { + pub fn new_schema(inner: FlightData, schema: SchemaRef) -> Self { Self { inner, payload: DecodedPayload::Schema(schema), @@ -458,7 +459,7 @@ pub enum DecodedPayload { None, /// A decoded Schema message - Schema(Arc), + Schema(SchemaRef), /// A decoded Record batch. RecordBatch(RecordBatch), diff --git a/iox_query/src/frontend.rs b/iox_query/src/frontend.rs index a1185aa016..a12c80bfd5 100644 --- a/iox_query/src/frontend.rs +++ b/iox_query/src/frontend.rs @@ -66,7 +66,7 @@ mod test { let ctx = IOxSessionContext::with_testing(); // Build a logical plan with deduplication - let scan_plan = ScanPlanBuilder::new(Arc::from("t"), schema, ctx.child_ctx("scan_plan")) + let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema, ctx.child_ctx("scan_plan")) .with_chunks(chunks) .build() .unwrap(); @@ -114,7 +114,7 @@ mod test { let ctx = IOxSessionContext::with_testing(); // Build a logical plan without deduplication - let scan_plan = ScanPlanBuilder::new(Arc::from("t"), schema, ctx.child_ctx("scan_plan")) + let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema, ctx.child_ctx("scan_plan")) .with_chunks(chunks) // force it to not deduplicate .enable_deduplication(false) @@ -178,7 +178,7 @@ mod test { let ctx = IOxSessionContext::with_testing(); // Build a logical plan without deduplication but sort - let scan_plan = ScanPlanBuilder::new(Arc::from("t"), schema, ctx.child_ctx("scan_plan")) + let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema, ctx.child_ctx("scan_plan")) .with_chunks(chunks) // force it to not deduplicate .enable_deduplication(false) @@ -230,7 +230,7 @@ mod test { // Use a split plan as it has StreamSplitExec, DeduplicateExec and IOxReadFilternode let split_plan = ReorgPlanner::new(IOxSessionContext::with_testing()) - .split_plan(Arc::from("t"), schema, chunks, sort_key, vec![1000]) + .split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000]) .expect("created compact plan"); let executor = Executor::new_testing(); @@ -361,7 +361,7 @@ mod test { extractor.inner } - fn test_chunks(overlapped: bool) -> (Arc, Vec>) { + fn test_chunks(overlapped: bool) -> (Schema, Vec>) { let max_time = if overlapped { 70000 } else { 7000 }; let chunk1 = Arc::new( TestChunk::new("t") @@ -385,20 +385,20 @@ mod test { ); let schema = SchemaMerger::new() - .merge(&chunk1.schema()) + .merge(chunk1.schema()) .unwrap() - .merge(&chunk2.schema()) + .merge(chunk2.schema()) .unwrap() .build(); (schema, vec![chunk1, chunk2]) } - fn get_test_chunks() -> (Arc, Vec>) { + fn get_test_chunks() -> (Schema, Vec>) { test_chunks(false) } - fn get_test_overlapped_chunks() -> (Arc, Vec>) { + fn get_test_overlapped_chunks() -> (Schema, Vec>) { test_chunks(true) } } diff --git a/iox_query/src/frontend/common.rs b/iox_query/src/frontend/common.rs index 7074054228..f870b23304 100644 --- a/iox_query/src/frontend/common.rs +++ b/iox_query/src/frontend/common.rs @@ -63,7 +63,7 @@ impl std::fmt::Debug for ScanPlan { impl ScanPlan { /// Return the schema of the source (the merged schema across all tables) - pub fn schema(&self) -> Arc { + pub fn schema(&self) -> &Schema { self.provider.iox_schema() } } @@ -90,7 +90,7 @@ pub struct ScanPlanBuilder<'a> { table_name: Arc, /// The schema of the resulting table (any chunks that don't have /// all the necessary columns will be extended appropriately) - table_schema: Arc, + table_schema: &'a Schema, chunks: Vec>, /// The sort key that describes the desired output sort order output_sort_key: Option, @@ -100,7 +100,7 @@ pub struct ScanPlanBuilder<'a> { } impl<'a> ScanPlanBuilder<'a> { - pub fn new(table_name: Arc, table_schema: Arc, ctx: IOxSessionContext) -> Self { + pub fn new(table_name: Arc, table_schema: &'a Schema, ctx: IOxSessionContext) -> Self { Self { ctx, table_name, @@ -158,7 +158,7 @@ impl<'a> ScanPlanBuilder<'a> { // Prepare the plan for the table let mut builder = ProviderBuilder::new( Arc::clone(&table_name), - table_schema, + table_schema.clone(), ctx.child_ctx("provider_builder"), ) .with_enable_deduplication(deduplication); @@ -193,7 +193,7 @@ impl<'a> ScanPlanBuilder<'a> { // Rewrite expression so it only refers to columns in this chunk let schema = provider.iox_schema(); trace!(%table_name, ?filter_expr, "Adding filter expr"); - let mut rewriter = MissingColumnsToNull::new(&schema); + let mut rewriter = MissingColumnsToNull::new(schema); let filter_expr = filter_expr .rewrite(&mut rewriter) diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index 33ad3d715b..35160b208c 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -309,7 +309,7 @@ impl InfluxRpcPlanner { let plan = Self::table_name_plan( ctx, Arc::clone(table_name), - schema, + &schema, predicate, chunks, )?; @@ -472,7 +472,7 @@ impl InfluxRpcPlanner { let mut ctx = ctx.child_ctx("tag_keys_plan"); ctx.set_metadata("table", table_name.to_string()); - let plan = self.tag_keys_plan(ctx, table_name, schema, predicate, chunks_full)?; + let plan = self.tag_keys_plan(ctx, table_name, &schema, predicate, chunks_full)?; if let Some(plan) = plan { builder = builder.append_other(plan) @@ -641,7 +641,7 @@ impl InfluxRpcPlanner { let mut ctx = ctx.child_ctx("scan_and_filter planning"); ctx.set_metadata("table", table_name.to_string()); - let scan_and_filter = ScanPlanBuilder::new(Arc::clone(table_name), schema, ctx) + let scan_and_filter = ScanPlanBuilder::new(Arc::clone(table_name), &schema, ctx) .with_chunks(chunks_full) .with_predicate(predicate) .build()?; @@ -884,7 +884,7 @@ impl InfluxRpcPlanner { Aggregate::None => Self::read_filter_plan( ctx.child_ctx("read_filter plan"), table_name, - Arc::clone(&schema), + schema, predicate, chunks, ), @@ -963,7 +963,7 @@ impl InfluxRpcPlanner { &self, ctx: IOxSessionContext, table_name: &str, - schema: Arc, + schema: &Schema, predicate: &Predicate, chunks: Vec>, ) -> Result> { @@ -1029,7 +1029,7 @@ impl InfluxRpcPlanner { fn field_columns_plan( ctx: IOxSessionContext, table_name: Arc, - schema: Arc, + schema: &Schema, predicate: &Predicate, chunks: Vec>, ) -> Result { @@ -1085,7 +1085,7 @@ impl InfluxRpcPlanner { fn table_name_plan( ctx: IOxSessionContext, table_name: Arc, - schema: Arc, + schema: &Schema, predicate: &Predicate, chunks: Vec>, ) -> Result { @@ -1100,7 +1100,7 @@ impl InfluxRpcPlanner { .build()?; // Select only fields requested - let select_exprs: Vec<_> = filtered_fields_iter(&scan_and_filter.schema(), predicate) + let select_exprs: Vec<_> = filtered_fields_iter(scan_and_filter.schema(), predicate) .map(|field| field.name.as_expr()) .collect(); @@ -1130,7 +1130,7 @@ impl InfluxRpcPlanner { fn read_filter_plan( ctx: IOxSessionContext, table_name: &str, - schema: Arc, + schema: &Schema, predicate: &Predicate, chunks: Vec>, ) -> Result { @@ -1143,7 +1143,7 @@ impl InfluxRpcPlanner { .with_chunks(chunks) .build()?; - let schema = scan_and_filter.schema(); + let schema = scan_and_filter.provider.iox_schema(); let tags_and_timestamp: Vec<_> = scan_and_filter .schema() @@ -1164,7 +1164,7 @@ impl InfluxRpcPlanner { let tags_fields_and_timestamps: Vec = schema .tags_iter() .map(|field| field.name().as_expr()) - .chain(filtered_fields_iter(&schema, predicate).map(|f| f.expr)) + .chain(filtered_fields_iter(schema, predicate).map(|f| f.expr)) .chain(schema.time_iter().map(|field| field.name().as_expr())) .collect(); @@ -1179,7 +1179,7 @@ impl InfluxRpcPlanner { .map(|field| Arc::from(field.name().as_str())) .collect(); - let field_columns = filtered_fields_iter(&schema, predicate) + let field_columns = filtered_fields_iter(schema, predicate) .map(|field| Arc::from(field.name)) .collect(); @@ -1238,7 +1238,7 @@ impl InfluxRpcPlanner { fn read_group_plan( ctx: IOxSessionContext, table_name: &str, - schema: Arc, + schema: &Schema, predicate: &Predicate, agg: Aggregate, chunks: Vec>, @@ -1255,7 +1255,7 @@ impl InfluxRpcPlanner { // order the tag columns so that the group keys come first (we // will group and // order in the same order) - let schema = scan_and_filter.schema(); + let schema = scan_and_filter.provider.iox_schema(); let tag_columns: Vec<_> = schema.tags_iter().map(|f| f.name() as &str).collect(); // Group by all tag columns @@ -1267,7 +1267,7 @@ impl InfluxRpcPlanner { let AggExprs { agg_exprs, field_columns, - } = AggExprs::try_new_for_read_group(agg, &schema, predicate)?; + } = AggExprs::try_new_for_read_group(agg, schema, predicate)?; let plan_builder = scan_and_filter .plan_builder @@ -1347,7 +1347,7 @@ impl InfluxRpcPlanner { fn read_window_aggregate_plan( ctx: IOxSessionContext, table_name: &str, - schema: Arc, + schema: &Schema, predicate: &Predicate, agg: Aggregate, every: WindowDuration, @@ -1363,7 +1363,7 @@ impl InfluxRpcPlanner { .with_chunks(chunks) .build()?; - let schema = scan_and_filter.schema(); + let schema = scan_and_filter.provider.iox_schema(); // Group by all tag columns and the window bounds let window_bound = make_window_bound_expr(TIME_COLUMN_NAME.as_expr(), every, offset) @@ -1378,7 +1378,7 @@ impl InfluxRpcPlanner { let AggExprs { agg_exprs, field_columns, - } = AggExprs::try_new_for_read_window_aggregate(agg, &schema, predicate)?; + } = AggExprs::try_new_for_read_window_aggregate(agg, schema, predicate)?; // sort by the group by expressions as well let sort_exprs = group_exprs @@ -1436,7 +1436,7 @@ fn table_chunk_stream<'a>( let table_schema = namespace.table_schema(table_name); let projection = match table_schema { Some(table_schema) => { - columns_in_predicates(need_fields, table_schema, table_name, predicate) + columns_in_predicates(need_fields, &table_schema, table_name, predicate) } None => None, }; @@ -1470,7 +1470,7 @@ fn table_chunk_stream<'a>( // in the predicate. fn columns_in_predicates( need_fields: bool, - table_schema: Arc, + table_schema: &Schema, table_name: &str, predicate: &Predicate, ) -> Option> { @@ -1560,7 +1560,7 @@ where &'a str, &'a Predicate, Vec>, - Arc, + &'a Schema, ) -> Result

+ Clone + Send @@ -1592,7 +1592,7 @@ where table_name: table_name.as_ref(), })?; - f(&ctx, table_name, predicate, chunks, schema) + f(&ctx, table_name, predicate, chunks, &schema) } }) .try_collect() @@ -2004,25 +2004,24 @@ mod tests { // test 1: empty predicate without need_fields let predicate = Predicate::new(); let need_fields = false; - let projection = columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate); + let projection = columns_in_predicates(need_fields, &schema, table, &predicate); assert_eq!(projection, None); // test 2: empty predicate with need_fields let need_fields = true; - let projection = columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate); + let projection = columns_in_predicates(need_fields, &schema, table, &predicate); assert_eq!(projection, None); // test 3: predicate on tag without need_fields let predicate = Predicate::new().with_expr(col("foo").eq(lit("some_thing"))); let need_fields = false; - let projection = - columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap(); + let projection = columns_in_predicates(need_fields, &schema, table, &predicate).unwrap(); // return index of foo assert_eq!(projection, vec![1]); // test 4: predicate on tag with need_fields let need_fields = true; - let projection = columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate); + let projection = columns_in_predicates(need_fields, &schema, table, &predicate); // return None means all fields assert_eq!(projection, None); @@ -2032,7 +2031,7 @@ mod tests { .with_field_columns(vec!["i64_field".to_string()]); let need_fields = false; let mut projection = - columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap(); + columns_in_predicates(need_fields, &schema, table, &predicate).unwrap(); projection.sort(); // return indexes of i64_field and foo assert_eq!(projection, vec![1, 2]); @@ -2040,7 +2039,7 @@ mod tests { // test 6: predicate on tag with field_columns with need_fields let need_fields = true; let mut projection = - columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap(); + columns_in_predicates(need_fields, &schema, table, &predicate).unwrap(); projection.sort(); // return indexes of foo and index of i64_field assert_eq!(projection, vec![1, 2]); @@ -2051,7 +2050,7 @@ mod tests { .with_field_columns(vec!["i64_field".to_string()]); let need_fields = false; let mut projection = - columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap(); + columns_in_predicates(need_fields, &schema, table, &predicate).unwrap(); projection.sort(); // return indexes of bard and i64_field assert_eq!(projection, vec![0, 2]); @@ -2059,7 +2058,7 @@ mod tests { // test 7: predicate on tag and field with field_columns with need_fields let need_fields = true; let mut projection = - columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap(); + columns_in_predicates(need_fields, &schema, table, &predicate).unwrap(); projection.sort(); // return indexes of bard and i64_field assert_eq!(projection, vec![0, 2]); diff --git a/iox_query/src/frontend/reorg.rs b/iox_query/src/frontend/reorg.rs index dcb3860140..b7686f0192 100644 --- a/iox_query/src/frontend/reorg.rs +++ b/iox_query/src/frontend/reorg.rs @@ -75,7 +75,7 @@ impl ReorgPlanner { pub fn compact_plan( &self, table_name: Arc, - schema: Arc, + schema: &Schema, chunks: I, output_sort_key: SortKey, ) -> Result @@ -150,7 +150,7 @@ impl ReorgPlanner { pub fn split_plan( &self, table_name: Arc, - schema: Arc, + schema: &Schema, chunks: I, output_sort_key: SortKey, split_times: Vec, @@ -214,7 +214,7 @@ mod test { use super::*; - async fn get_test_chunks() -> (Arc, Vec>) { + async fn get_test_chunks() -> (Schema, Vec>) { // Chunk 1 with 5 rows of data on 2 tags let chunk1 = Arc::new( TestChunk::new("t") @@ -261,16 +261,16 @@ mod test { assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await); let schema = SchemaMerger::new() - .merge(&chunk1.schema()) + .merge(chunk1.schema()) .unwrap() - .merge(&chunk2.schema()) + .merge(chunk2.schema()) .unwrap() .build(); (schema, vec![chunk1, chunk2]) } - async fn get_sorted_test_chunks() -> (Arc, Vec>) { + async fn get_sorted_test_chunks() -> (Schema, Vec>) { // Chunk 1 let chunk1 = Arc::new( TestChunk::new("t") @@ -307,7 +307,7 @@ mod test { ]; assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await); - (chunk1.schema(), vec![chunk1, chunk2]) + (chunk1.schema().clone(), vec![chunk1, chunk2]) } #[tokio::test] @@ -333,7 +333,7 @@ mod test { .build(); let compact_plan = ReorgPlanner::new(IOxSessionContext::with_testing()) - .compact_plan(Arc::from("t"), Arc::clone(&schema), chunks, sort_key) + .compact_plan(Arc::from("t"), &schema, chunks, sort_key) .expect("created compact plan"); let physical_plan = executor @@ -370,7 +370,7 @@ mod test { .build(); let compact_plan = ReorgPlanner::new(IOxSessionContext::with_testing()) - .compact_plan(Arc::from("t"), schema, chunks, sort_key) + .compact_plan(Arc::from("t"), &schema, chunks, sort_key) .expect("created compact plan"); let executor = Executor::new_testing(); @@ -421,7 +421,7 @@ mod test { // split on 1000 should have timestamps 1000, 5000, and 7000 let split_plan = ReorgPlanner::new(IOxSessionContext::with_testing()) - .split_plan(Arc::from("t"), schema, chunks, sort_key, vec![1000]) + .split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000]) .expect("created compact plan"); let executor = Executor::new_testing(); @@ -485,7 +485,7 @@ mod test { // split on 1000 and 7000 let split_plan = ReorgPlanner::new(IOxSessionContext::with_testing()) - .split_plan(Arc::from("t"), schema, chunks, sort_key, vec![1000, 7000]) + .split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000, 7000]) .expect("created compact plan"); let executor = Executor::new_testing(); @@ -561,7 +561,7 @@ mod test { // split on 1000 and 7000 let _split_plan = ReorgPlanner::new(IOxSessionContext::with_testing()) - .split_plan(Arc::from("t"), schema, chunks, sort_key, vec![]) // reason of panic: empty split_times + .split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![]) // reason of panic: empty split_times .expect("created compact plan"); } @@ -580,7 +580,7 @@ mod test { // split on 1000 and 7000 let _split_plan = ReorgPlanner::new(IOxSessionContext::with_testing()) - .split_plan(Arc::from("t"), schema, chunks, sort_key, vec![1000, 500]) // reason of panic: split_times not in ascending order + .split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000, 500]) // reason of panic: split_times not in ascending order .expect("created compact plan"); } } diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index e074015c48..e9ea45b914 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -44,7 +44,7 @@ pub trait QueryChunkMeta { fn summary(&self) -> Arc; /// return a reference to the summary of the data held in this chunk - fn schema(&self) -> Arc; + fn schema(&self) -> &Schema; /// Return a reference to the chunk's partition sort key if any. /// Only persisted chunk has its partition sort key @@ -191,7 +191,7 @@ impl QueryChunkData { /// Read data into [`RecordBatch`]es. This is mostly meant for testing! pub async fn read_to_batches( self, - schema: Arc, + schema: &Schema, session_ctx: &SessionContext, ) -> Vec { match self { @@ -274,7 +274,7 @@ where self.as_ref().summary() } - fn schema(&self) -> Arc { + fn schema(&self) -> &Schema { self.as_ref().schema() } @@ -303,7 +303,7 @@ impl QueryChunkMeta for Arc { self.as_ref().summary() } - fn schema(&self) -> Arc { + fn schema(&self) -> &Schema { self.as_ref().schema() } diff --git a/iox_query/src/plan/influxql/test_utils.rs b/iox_query/src/plan/influxql/test_utils.rs index d7ef50ac88..0d905ca696 100644 --- a/iox_query/src/plan/influxql/test_utils.rs +++ b/iox_query/src/plan/influxql/test_utils.rs @@ -8,7 +8,6 @@ use influxdb_influxql_parser::select::{Field, SelectStatement}; use influxdb_influxql_parser::statement::Statement; use predicate::rpc_predicate::QueryNamespaceMeta; use schema::Schema; -use std::sync::Arc; /// Returns the first `Field` of the `SELECT` statement. pub(crate) fn get_first_field(s: &str) -> Field { @@ -86,8 +85,8 @@ impl QueryNamespaceMeta for MockNamespace { .collect() } - fn table_schema(&self, table_name: &str) -> Option> { + fn table_schema(&self, table_name: &str) -> Option { let c = self.chunks.iter().find(|x| x.table_name() == table_name)?; - Some(c.schema()) + Some(c.schema().clone()) } } diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index 5801099c59..9443fba6c5 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -100,7 +100,7 @@ pub trait ChunkPruner: Sync + Send + std::fmt::Debug { fn prune_chunks( &self, table_name: &str, - table_schema: Arc, + table_schema: &Schema, chunks: Vec>, predicate: &Predicate, ) -> Result>>; @@ -112,7 +112,7 @@ pub trait ChunkPruner: Sync + Send + std::fmt::Debug { #[derive(Debug)] pub struct ProviderBuilder { table_name: Arc, - schema: Arc, + schema: Schema, chunks: Vec>, output_sort_key: Option, deduplication: bool, @@ -122,7 +122,7 @@ pub struct ProviderBuilder { } impl ProviderBuilder { - pub fn new(table_name: Arc, schema: Arc, ctx: IOxSessionContext) -> Self { + pub fn new(table_name: Arc, schema: Schema, ctx: IOxSessionContext) -> Self { Self { table_name, schema, @@ -173,7 +173,7 @@ impl ProviderBuilder { pub struct ChunkTableProvider { table_name: Arc, /// The IOx schema (wrapper around Arrow Schemaref) for this table - iox_schema: Arc, + iox_schema: Schema, /// The chunks chunks: Vec>, /// The desired output sort key if any @@ -187,8 +187,8 @@ pub struct ChunkTableProvider { impl ChunkTableProvider { /// Return the IOx schema view for the data provided by this provider - pub fn iox_schema(&self) -> Arc { - Arc::clone(&self.iox_schema) + pub fn iox_schema(&self) -> &Schema { + &self.iox_schema } /// Return the Arrow schema view for the data provided by this provider @@ -229,10 +229,7 @@ impl TableProvider for ChunkTableProvider { let chunks: Vec> = self.chunks.to_vec(); // Figure out the schema of the requested output - let scan_schema = match projection { - Some(indices) => Arc::new(self.iox_schema.select_by_indices(indices)), - None => Arc::clone(&self.iox_schema), - }; + let schema = projection.map(|indices| self.iox_schema.select_by_indices(indices)); // This debug shows the self.arrow_schema() includes all columns in all chunks // which means the schema of all chunks are merged before invoking this scan @@ -247,7 +244,7 @@ impl TableProvider for ChunkTableProvider { let plan = deduplicate.build_scan_plan( Arc::clone(&self.table_name), - scan_schema, + schema.as_ref().unwrap_or(&self.iox_schema), chunks, predicate, self.output_sort_key.clone(), @@ -502,7 +499,7 @@ impl Deduplicater { pub(crate) fn build_scan_plan( mut self, table_name: Arc, - output_schema: Arc, + output_schema: &Schema, chunks: Vec>, mut predicate: Predicate, output_sort_key: Option, @@ -532,7 +529,7 @@ impl Deduplicater { let mut non_duplicate_plans = Self::build_plans_for_non_duplicates_chunks( self.ctx.child_ctx("build_plans_for_non_duplicates_chunks"), - Arc::clone(&output_schema), + output_schema, chunks, predicate, output_sort_key.as_ref(), @@ -593,7 +590,7 @@ impl Deduplicater { plans.push(Self::build_deduplicate_plan_for_overlapped_chunks( self.ctx .child_ctx("build_deduplicate_plan_for_overlapped_chunks"), - Arc::clone(&output_schema), + output_schema, overlapped_chunks, predicate.clone(), &chunks_dedup_sort_key, @@ -622,7 +619,7 @@ impl Deduplicater { plans.push(Self::build_deduplicate_plan_for_chunk_with_duplicates( self.ctx .child_ctx("build_deduplicate_plan_for_chunk_with_duplicates"), - Arc::clone(&output_schema), + output_schema, chunk_with_duplicates, predicate.clone(), &chunk_dedup_sort_key, @@ -640,7 +637,7 @@ impl Deduplicater { ); let mut non_duplicate_plans = Self::build_plans_for_non_duplicates_chunks( self.ctx.child_ctx("build_plans_for_non_duplicates_chunks"), - Arc::clone(&output_schema), + output_schema, chunks, predicate, output_sort_key.as_ref(), @@ -808,7 +805,7 @@ impl Deduplicater { ///``` fn build_deduplicate_plan_for_overlapped_chunks( ctx: IOxSessionContext, - output_schema: Arc, + output_schema: &Schema, chunks: Vec>, // These chunks are identified overlapped predicate: Predicate, output_sort_key: &SortKey, @@ -829,7 +826,7 @@ impl Deduplicater { }; let pk_schema = Self::compute_pk_schema(&chunks, schema_interner); - let input_schema = Self::compute_input_schema(&output_schema, &pk_schema, schema_interner); + let input_schema = Self::compute_input_schema(output_schema, &pk_schema, schema_interner); debug!( ?output_schema, @@ -844,7 +841,7 @@ impl Deduplicater { .map(|chunk| { Self::build_sort_plan_for_read_filter( ctx.child_ctx("build_sort_plan_for_read_filter"), - Arc::clone(&input_schema), + &input_schema, Arc::clone(chunk), predicate.clone(), Some(output_sort_key), @@ -900,7 +897,7 @@ impl Deduplicater { ///``` fn build_deduplicate_plan_for_chunk_with_duplicates( ctx: IOxSessionContext, - output_schema: Arc, + output_schema: &Schema, chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, output_sort_key: &SortKey, @@ -909,10 +906,10 @@ impl Deduplicater { // This will practically never matter because this can only happen for in-memory chunks which are currently // backed by RecordBatches and these don't do anything with the predicate at all. However to prevent weird // future issues, we still transform the predicate here. (@crepererum, 2022-11-16) - let predicate = predicate.push_through_dedup(&chunk.schema()); + let predicate = predicate.push_through_dedup(chunk.schema()); let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)], schema_interner); - let input_schema = Self::compute_input_schema(&output_schema, &pk_schema, schema_interner); + let input_schema = Self::compute_input_schema(output_schema, &pk_schema, schema_interner); debug!( ?output_schema, @@ -927,7 +924,7 @@ impl Deduplicater { // Create the 2 bottom nodes RecordBatchesExec and SortExec let plan = Self::build_sort_plan_for_read_filter( ctx.child_ctx("build_sort_plan_for_read_filter"), - Arc::clone(&input_schema), + &input_schema, Arc::clone(&chunks[0]), predicate, Some(output_sort_key), @@ -963,7 +960,7 @@ impl Deduplicater { ///``` fn add_projection_node_if_needed( - output_schema: Arc, + output_schema: &Schema, input: Arc, ) -> Result> { let input_schema = input.schema(); @@ -1042,7 +1039,7 @@ impl Deduplicater { ///``` fn build_sort_plan_for_read_filter( ctx: IOxSessionContext, - output_schema: Arc, + output_schema: &Schema, chunk: Arc, predicate: Predicate, // This is the select predicate of the query output_sort_key: Option<&SortKey>, @@ -1066,7 +1063,7 @@ impl Deduplicater { trace!("Build sort plan for a single chunk. Sort node won't be added if the plan is already sorted"); let mut schema_merger = SchemaMerger::new() .with_interner(schema_interner) - .merge(&output_schema) + .merge(output_schema) .unwrap(); let chunk_schema = chunk.schema(); trace!(?chunk_schema, "chunk schema"); @@ -1097,7 +1094,7 @@ impl Deduplicater { // Create the bottom node RecordBatchesExec for this chunk let mut input = chunks_to_physical_nodes( - input_schema, + &input_schema, output_sort_key, vec![Arc::clone(&chunk)], predicate, @@ -1206,7 +1203,7 @@ impl Deduplicater { // And some optional operators on top such as applying delete predicates or sort the chunk fn build_plan_for_non_duplicates_chunk( ctx: IOxSessionContext, - output_schema: Arc, + output_schema: &Schema, chunk: Arc, // This chunk is identified having no duplicates predicate: Predicate, output_sort_key: Option<&SortKey>, @@ -1256,7 +1253,7 @@ impl Deduplicater { #[allow(clippy::too_many_arguments)] fn build_plans_for_non_duplicates_chunks( ctx: IOxSessionContext, - output_schema: Arc, + output_schema: &Schema, chunks: Chunks, // These chunks is identified having no duplicates predicate: Predicate, output_sort_key: Option<&SortKey>, @@ -1291,7 +1288,7 @@ impl Deduplicater { .map(|chunk| { Self::build_plan_for_non_duplicates_chunk( ctx.child_ctx("build_plan_for_non_duplicates_chunk"), - Arc::clone(&output_schema), + output_schema, Arc::clone(chunk), predicate.clone(), output_sort_key, @@ -1307,7 +1304,7 @@ impl Deduplicater { fn compute_pk_schema<'a>( chunks: impl IntoIterator>, schema_interner: &mut SchemaInterner, - ) -> Arc { + ) -> Schema { let mut schema_merger = SchemaMerger::new().with_interner(schema_interner); for chunk in chunks { let chunk_schema = chunk.schema(); @@ -1330,10 +1327,10 @@ impl Deduplicater { fn compute_chunks_schema<'a>( chunks: impl IntoIterator>, schema_interner: &mut SchemaInterner, - ) -> Arc { + ) -> Schema { let mut schema_merger = SchemaMerger::new().with_interner(schema_interner); for chunk in chunks { - schema_merger = schema_merger.merge(&chunk.schema()).unwrap(); + schema_merger = schema_merger.merge(chunk.schema()).unwrap(); } schema_merger.build() @@ -1345,7 +1342,7 @@ impl Deduplicater { output_schema: &Schema, pk_schema: &Schema, schema_interner: &mut SchemaInterner, - ) -> Arc { + ) -> Schema { SchemaMerger::new() .with_interner(schema_interner) .merge(output_schema) @@ -1621,7 +1618,7 @@ mod test { let sort_plan = Deduplicater::build_sort_plan_for_read_filter( IOxSessionContext::with_testing(), - Arc::clone(&schema), + schema, Arc::clone(&chunk), Predicate::default(), Some(&sort_key.clone()), @@ -1775,7 +1772,7 @@ mod test { // All chunks in one single scan let plans = Deduplicater::build_plans_for_non_duplicates_chunks( IOxSessionContext::with_testing(), - Arc::clone(&schema), + schema, Chunks::split_overlapped_chunks(vec![Arc::clone(&chunk1), Arc::clone(&chunk2)], false) .unwrap(), Predicate::default(), @@ -1939,7 +1936,7 @@ mod test { ) as Arc; // Datafusion schema of the chunk // the same for 2 chunks - let schema = chunk1.schema(); + let schema = chunk1.schema().clone(); let chunks = vec![chunk1, chunk2]; // data in its original form @@ -1964,7 +1961,7 @@ mod test { let output_sort_key = SortKey::from_columns(vec!["tag1", "tag2", "time"]); let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( IOxSessionContext::with_testing(), - Arc::clone(&schema), + &schema, chunks, Predicate::default(), &output_sort_key, @@ -2024,7 +2021,7 @@ mod test { let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( IOxSessionContext::with_testing(), - schema, + &schema, vec![chunk1, chunk2], Predicate::default(), &output_sort_key, @@ -2100,7 +2097,7 @@ mod test { let output_sort_key = SortKey::from_columns(vec!["tag1", "tag2", "time"]); let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( IOxSessionContext::with_testing(), - Arc::new(schema), + &schema, chunks, Predicate::default(), &output_sort_key, @@ -2199,7 +2196,7 @@ mod test { let output_sort_key = SortKey::from_columns(vec!["tag2", "tag1", "time"]); let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( IOxSessionContext::with_testing(), - Arc::new(schema), + &schema, chunks, Predicate::default(), &output_sort_key, @@ -2267,11 +2264,11 @@ mod test { // With provided stats, the computed key will be (tag2, tag1, tag3, time) // Requested output schema == the schema for all three let schema = SchemaMerger::new() - .merge(chunk1.schema().as_ref()) + .merge(chunk1.schema()) .unwrap() - .merge(chunk2.schema().as_ref()) + .merge(chunk2.schema()) .unwrap() - .merge(chunk3.schema().as_ref()) + .merge(chunk3.schema()) .unwrap() .build(); @@ -2306,7 +2303,7 @@ mod test { let output_sort_key = SortKey::from_columns(vec!["tag2", "tag1", "time"]); let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( IOxSessionContext::with_testing(), - schema, + &schema, chunks, Predicate::default(), &output_sort_key, @@ -2364,7 +2361,7 @@ mod test { ) as Arc; // Datafusion schema of the chunk - let schema = chunk.schema(); + let schema = chunk.schema().clone(); let chunks = vec![chunk]; // data in its original form @@ -2385,7 +2382,7 @@ mod test { let plan = deduplicator .build_scan_plan( Arc::from("t"), - Arc::clone(&schema), + &schema, chunks.clone(), Predicate::default(), None, @@ -2400,7 +2397,7 @@ mod test { let deduplicator = Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false); let plan = deduplicator - .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None) + .build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None) .unwrap(); let batch = test_collect(plan).await; // The data will stay in their original order @@ -2433,7 +2430,7 @@ mod test { ) as Arc; // Datafusion schema of the chunk - let schema = chunk.schema(); + let schema = chunk.schema().clone(); let chunks = vec![chunk]; // data in its original form @@ -2459,7 +2456,7 @@ mod test { let plan = deduplicator .build_scan_plan( Arc::from("t"), - Arc::clone(&schema), + &schema, chunks.clone(), Predicate::default(), None, @@ -2487,7 +2484,7 @@ mod test { let deduplicator = Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false); let plan = deduplicator - .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None) + .build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None) .unwrap(); let batch = test_collect(plan).await; // Deduplication is disabled, the output shoudl be the same as the original data @@ -2546,13 +2543,12 @@ mod test { .timestamp() .build() .unwrap(); - let schema = Arc::new(schema); let deduplicator = Deduplicater::new(IOxSessionContext::with_testing()); let plan = deduplicator .build_scan_plan( Arc::from("t"), - Arc::clone(&Arc::clone(&schema)), + &schema, chunks.clone(), Predicate::default(), None, @@ -2581,13 +2577,7 @@ mod test { let deduplicator = Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false); let plan = deduplicator - .build_scan_plan( - Arc::from("t"), - Arc::clone(&schema), - chunks, - Predicate::default(), - None, - ) + .build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None) .unwrap(); let batch = test_collect(plan).await; // Deduplication is disabled, the output should include all rows but only 2 selected columns @@ -2655,7 +2645,7 @@ mod test { ) as Arc; // Datafusion schema of the chunk - let schema = chunk1.schema(); + let schema = chunk1.schema().clone(); let chunks = vec![chunk1, chunk2]; // data in its original form @@ -2686,7 +2676,7 @@ mod test { let plan = deduplicator .build_scan_plan( Arc::from("t"), - Arc::clone(&schema), + &schema, chunks.clone(), Predicate::default(), None, @@ -2715,7 +2705,7 @@ mod test { let deduplicator = Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false); let plan = deduplicator - .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None) + .build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None) .unwrap(); let batch = test_collect(plan).await; // Deduplication is disabled, the output shoudl be the same as the original data @@ -2816,7 +2806,7 @@ mod test { ) as Arc; // Datafusion schema of the chunk - let schema = chunk1.schema(); + let schema = chunk1.schema().clone(); let chunks = vec![chunk1, chunk2, chunk3, chunk4]; // data in its original form @@ -2855,7 +2845,7 @@ mod test { let plan = deduplicator .build_scan_plan( Arc::from("t"), - Arc::clone(&schema), + &schema, chunks.clone(), Predicate::default(), None, @@ -2935,7 +2925,7 @@ mod test { let deduplicator = Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false); let plan = deduplicator - .build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None) + .build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None) .unwrap(); // Plan is very simple with one single RecordBatchesExec that includes 4 chunks @@ -3042,7 +3032,7 @@ mod test { ) as Arc; // Datafusion schema of the chunk - let schema = chunk1.schema(); + let schema = chunk1.schema().clone(); let chunks = vec![chunk1, chunk2, chunk3, chunk4]; // data in its original form @@ -3081,7 +3071,7 @@ mod test { let plan = deduplicator .build_scan_plan( Arc::from("t"), - Arc::clone(&schema), + &schema, chunks.clone(), Predicate::default(), Some(sort_key.clone()), // Ask to sort the plan output @@ -3158,7 +3148,7 @@ mod test { let plan = deduplicator .build_scan_plan( Arc::from("t"), - schema, + &schema, chunks, Predicate::default(), Some(sort_key.clone()), @@ -3260,13 +3250,13 @@ mod test { .with_sort_key(sort_key.clone()), // signal the chunk is sorted ) as Arc; - let schema = chunk1.schema(); + let schema = chunk1.schema().clone(); let chunks = vec![chunk1, chunk2, chunk3, chunk4]; let deduplicator = Deduplicater::new(IOxSessionContext::with_testing()); let plan = deduplicator .build_scan_plan( Arc::from("t"), - Arc::clone(&schema), + &schema, chunks.clone(), Predicate::default(), Some(sort_key.clone()), @@ -3325,7 +3315,7 @@ mod test { let plan = deduplicator .build_scan_plan( Arc::from("t"), - schema, + &schema, chunks, Predicate::default(), Some(sort_key), @@ -3488,7 +3478,7 @@ mod test { .with_sort_key(sort_key.clone()), // signal the chunk is sorted ) as Arc; - let schema = chunk1_1.schema(); + let schema = chunk1_1.schema().clone(); let chunks = vec![ chunk1_1, chunk1_2, chunk1_3, chunk1_4, chunk2_1, chunk2_2, chunk2_3, chunk2_4, chunk2_5, chunk2_6, @@ -3497,7 +3487,7 @@ mod test { let plan = deduplicator .build_scan_plan( Arc::from("t"), - Arc::clone(&schema), + &schema, chunks.clone(), Predicate::default(), Some(sort_key.clone()), @@ -3557,7 +3547,7 @@ mod test { let plan = deduplicator .build_scan_plan( Arc::from("t"), - schema, + &schema, chunks, Predicate::default(), Some(sort_key), diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index 36b89e6feb..e59af96b55 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -119,7 +119,7 @@ fn combine_sort_key( /// pushdown ([`RecordBatchesExec`] has NO builtin filter function). Delete predicates are NOT applied at all. The /// caller is responsible for wrapping the output node into appropriate filter nodes. pub fn chunks_to_physical_nodes( - iox_schema: Arc, + iox_schema: &Schema, output_sort_key: Option<&SortKey>, chunks: Vec>, predicate: Predicate, diff --git a/iox_query/src/pruning.rs b/iox_query/src/pruning.rs index e13d9b87bc..a079c634eb 100644 --- a/iox_query/src/pruning.rs +++ b/iox_query/src/pruning.rs @@ -72,8 +72,8 @@ pub trait PruningObserver { /// filtering those where the predicate can be proven to evaluate to /// `false` for every single row. pub fn prune_chunks( - table_schema: Arc, - chunks: &Vec>, + table_schema: &Schema, + chunks: &[Arc], predicate: &Predicate, ) -> Result, NotPrunedReason> { let num_chunks = chunks.len(); @@ -85,8 +85,8 @@ pub fn prune_chunks( /// Given a `Vec` of pruning summaries, return a `Vec` where `false` indicates that the /// predicate can be proven to evaluate to `false` for every single row. pub fn prune_summaries( - table_schema: Arc, - summaries: &Vec>, + table_schema: &Schema, + summaries: &[Arc], predicate: &Predicate, ) -> Result, NotPrunedReason> { let filter_expr = match predicate.filter_expr() { @@ -108,7 +108,7 @@ pub fn prune_summaries( }; let statistics = ChunkPruningStatistics { - table_schema: table_schema.as_ref(), + table_schema, summaries, }; @@ -126,7 +126,7 @@ pub fn prune_summaries( /// interface required by [`PruningPredicate`] struct ChunkPruningStatistics<'a> { table_schema: &'a Schema, - summaries: &'a Vec>, + summaries: &'a [Arc], } impl<'a> ChunkPruningStatistics<'a> { @@ -263,7 +263,7 @@ mod test { let c1 = Arc::new(TestChunk::new("chunk1")); let predicate = Predicate::new(); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result, Err(NotPrunedReason::NoExpressionOnPredicate)); } @@ -281,7 +281,7 @@ mod test { let predicate = Predicate::new().with_expr(col("column1").gt(lit(100.0f64))); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![false]); } @@ -299,7 +299,7 @@ mod test { let predicate = Predicate::new().with_expr(col("column1").gt(lit(100i64))); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![false]); } @@ -318,7 +318,7 @@ mod test { let predicate = Predicate::new().with_expr(col("column1").gt(lit(100u64))); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![false]); } @@ -335,7 +335,7 @@ mod test { let predicate = Predicate::new().with_expr(col("column1")); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![false; 1]); } @@ -355,7 +355,7 @@ mod test { let predicate = Predicate::new().with_expr(col("column1").gt(lit("z"))); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![false]); } @@ -372,7 +372,7 @@ mod test { let predicate = Predicate::new().with_expr(col("column1").lt(lit(100.0f64))); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![true]); } @@ -390,7 +390,7 @@ mod test { let predicate = Predicate::new().with_expr(col("column1").lt(lit(100i64))); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![true]); } @@ -408,7 +408,7 @@ mod test { let predicate = Predicate::new().with_expr(col("column1").lt(lit(100u64))); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![true]); } @@ -426,7 +426,7 @@ mod test { let predicate = Predicate::new().with_expr(col("column1")); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![true]); } @@ -446,14 +446,14 @@ mod test { let predicate = Predicate::new().with_expr(col("column1").lt(lit("z"))); - let result = prune_chunks(c1.schema(), &vec![c1], &predicate); + let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate); assert_eq!(result.expect("pruning succeeds"), vec![true]); } - fn merge_schema(chunks: &[Arc]) -> Arc { + fn merge_schema(chunks: &[Arc]) -> Schema { let mut merger = SchemaMerger::new(); for chunk in chunks { - merger = merger.merge(chunk.schema().as_ref()).unwrap(); + merger = merger.merge(chunk.schema()).unwrap(); } merger.build() } @@ -491,7 +491,7 @@ mod test { let chunks = vec![c1, c2, c3, c4]; let schema = merge_schema(&chunks); - let result = prune_chunks(schema, &chunks, &predicate); + let result = prune_chunks(&schema, &chunks, &predicate); assert_eq!( result.expect("pruning succeeds"), @@ -549,7 +549,7 @@ mod test { let chunks = vec![c1, c2, c3, c4, c5, c6]; let schema = merge_schema(&chunks); - let result = prune_chunks(schema, &chunks, &predicate); + let result = prune_chunks(&schema, &chunks, &predicate); assert_eq!( result.expect("pruning succeeds"), @@ -587,7 +587,7 @@ mod test { let chunks = vec![c1, c2, c3]; let schema = merge_schema(&chunks); - let result = prune_chunks(schema, &chunks, &predicate); + let result = prune_chunks(&schema, &chunks, &predicate); assert_eq!(result.expect("pruning succeeds"), vec![false, true, true]); } @@ -645,7 +645,7 @@ mod test { let chunks = vec![c1, c2, c3]; let schema = merge_schema(&chunks); - let result = prune_chunks(schema, &chunks, &predicate); + let result = prune_chunks(&schema, &chunks, &predicate); assert_eq!(result.expect("pruning succeeds"), vec![true, false, false]); } @@ -706,7 +706,7 @@ mod test { let chunks = vec![c1, c2, c3, c4, c5, c6]; let schema = merge_schema(&chunks); - let result = prune_chunks(schema, &chunks, &predicate); + let result = prune_chunks(&schema, &chunks, &predicate); assert_eq!( result.expect("Pruning succeeds"), diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 7ae4e56ad1..a0c6a54fe7 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -142,7 +142,7 @@ impl QueryNamespace for TestDatabase { } impl QueryNamespaceMeta for TestDatabase { - fn table_schema(&self, table_name: &str) -> Option> { + fn table_schema(&self, table_name: &str) -> Option { let mut merger = SchemaMerger::new(); let mut found_one = false; @@ -150,7 +150,7 @@ impl QueryNamespaceMeta for TestDatabase { for partition in partitions.values() { for chunk in partition.values() { if chunk.table_name() == table_name { - merger = merger.merge(&chunk.schema()).expect("consistent schemas"); + merger = merger.merge(chunk.schema()).expect("consistent schemas"); found_one = true; } } @@ -188,7 +188,7 @@ pub struct TestChunk { table_name: String, /// Schema of the table - schema: Arc, + schema: Schema, /// Return value for summary() table_summary: TableSummary, @@ -294,7 +294,7 @@ impl TestChunk { let table_name = table_name.into(); Self { table_name, - schema: Arc::new(SchemaBuilder::new().build().unwrap()), + schema: SchemaBuilder::new().build().unwrap(), table_summary: TableSummary::default(), id: ChunkId::new_test(0), may_contain_pk_duplicates: Default::default(), @@ -552,9 +552,7 @@ impl TestChunk { ) -> Self { let mut merger = SchemaMerger::new(); merger = merger.merge(&new_column_schema).unwrap(); - merger = merger - .merge(self.schema.as_ref()) - .expect("merging was successful"); + merger = merger.merge(&self.schema).expect("merging was successful"); self.schema = merger.build(); for i in 0..new_column_schema.len() { @@ -627,7 +625,7 @@ impl TestChunk { .collect::>(); let batch = - RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); + RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch"); if !self.quiet { println!("TestChunk batch data: {:#?}", batch); } @@ -667,7 +665,7 @@ impl TestChunk { .collect::>(); let batch = - RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); + RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch"); if !self.quiet { println!("TestChunk batch data: {:#?}", batch); } @@ -731,7 +729,7 @@ impl TestChunk { .collect::>(); let batch = - RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); + RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch"); self.table_data.push(Arc::new(batch)); self @@ -794,7 +792,7 @@ impl TestChunk { .collect::>(); let batch = - RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); + RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch"); self.table_data.push(Arc::new(batch)); self @@ -864,7 +862,7 @@ impl TestChunk { .collect::>(); let batch = - RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); + RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch"); self.table_data.push(Arc::new(batch)); self @@ -941,7 +939,7 @@ impl TestChunk { .collect::>(); let batch = - RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); + RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch"); self.table_data.push(Arc::new(batch)); self @@ -1069,8 +1067,8 @@ impl QueryChunkMeta for TestChunk { Arc::new(self.table_summary.clone()) } - fn schema(&self) -> Arc { - Arc::clone(&self.schema) + fn schema(&self) -> &Schema { + &self.schema } fn partition_sort_key(&self) -> Option<&SortKey> { diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 219552c4a4..7983f037df 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -424,11 +424,7 @@ impl TestTable { .collect(); let schema = table_schema.select_by_names(&selection).unwrap(); - let chunk = ParquetChunk::new( - Arc::new(file), - Arc::new(schema), - self.catalog.parquet_store.clone(), - ); + let chunk = ParquetChunk::new(Arc::new(file), schema, self.catalog.parquet_store.clone()); chunk .parquet_exec_input() .read_to_batches( @@ -624,7 +620,7 @@ impl TestPartition { ); let row_count = record_batch.num_rows(); assert!(row_count > 0, "Parquet file must have at least 1 row"); - let (record_batch, sort_key) = sort_batch(record_batch, schema.clone()); + let (record_batch, sort_key) = sort_batch(record_batch, &schema); let record_batch = dedup_batch(record_batch, &sort_key); let object_store_id = object_store_id.unwrap_or_else(Uuid::new_v4); @@ -974,7 +970,7 @@ impl TestParquetFile { } /// Get Parquet file schema. - pub async fn schema(&self) -> Arc { + pub async fn schema(&self) -> Schema { let table_schema = self.table.catalog_schema().await; let column_id_lookup = table_schema.column_id_map(); let selection: Vec<_> = self @@ -984,7 +980,7 @@ impl TestParquetFile { .map(|id| *column_id_lookup.get(id).unwrap()) .collect(); let table_schema: Schema = table_schema.clone().try_into().unwrap(); - Arc::new(table_schema.select_by_names(&selection).unwrap()) + table_schema.select_by_names(&selection).unwrap() } } @@ -1018,9 +1014,9 @@ pub fn now() -> Time { } /// Sort arrow record batch into arrow record batch and sort key. -fn sort_batch(record_batch: RecordBatch, schema: Schema) -> (RecordBatch, SortKey) { +fn sort_batch(record_batch: RecordBatch, schema: &Schema) -> (RecordBatch, SortKey) { // calculate realistic sort key - let sort_key = compute_sort_key(&schema, std::iter::once(&record_batch)); + let sort_key = compute_sort_key(schema, std::iter::once(&record_batch)); // set up sorting let mut sort_columns = Vec::with_capacity(record_batch.num_columns()); diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index d125f09ef2..5aa9084e67 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -18,7 +18,7 @@ pub struct ParquetChunk { parquet_file: Arc, /// Schema that goes with this table's parquet file - schema: Arc, + schema: Schema, /// Persists the parquet file within a namespace's relative path store: ParquetStorage, @@ -26,7 +26,7 @@ pub struct ParquetChunk { impl ParquetChunk { /// Create parquet chunk. - pub fn new(parquet_file: Arc, schema: Arc, store: ParquetStorage) -> Self { + pub fn new(parquet_file: Arc, schema: Schema, store: ParquetStorage) -> Self { Self { parquet_file, schema, @@ -55,9 +55,9 @@ impl ParquetChunk { mem::size_of_val(self) + self.parquet_file.size() - mem::size_of_val(&self.parquet_file) } - /// Infallably return the full schema (for all columns) for this chunk - pub fn schema(&self) -> Arc { - Arc::clone(&self.schema) + /// Infallibly return the full schema (for all columns) for this chunk + pub fn schema(&self) -> &Schema { + &self.schema } /// Return the columns names that belong to the given column selection diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index a2875f697d..470e2afc45 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -760,7 +760,7 @@ impl DecodedIoxParquetMetaData { } /// Read IOx schema from parquet metadata. - pub fn read_schema(&self) -> Result> { + pub fn read_schema(&self) -> Result { let file_metadata = self.md.file_metadata(); let arrow_schema = parquet_to_arrow_schema( @@ -776,10 +776,9 @@ impl DecodedIoxParquetMetaData { // as this metadata will vary from file to file let arrow_schema_ref = Arc::new(arrow_schema.with_metadata(Default::default())); - let schema: Schema = arrow_schema_ref + arrow_schema_ref .try_into() - .context(IoxFromArrowFailureSnafu {})?; - Ok(Arc::new(schema)) + .context(IoxFromArrowFailureSnafu {}) } /// Read IOx statistics (including timestamp range) from parquet metadata. diff --git a/predicate/src/rpc_predicate.rs b/predicate/src/rpc_predicate.rs index cf47cbd01e..e2b124ca0f 100644 --- a/predicate/src/rpc_predicate.rs +++ b/predicate/src/rpc_predicate.rs @@ -169,7 +169,9 @@ pub trait QueryNamespaceMeta { fn table_names(&self) -> Vec; /// Schema for a specific table if the table exists. - fn table_schema(&self, table_name: &str) -> Option>; + /// + /// TODO: Make this return Option<&Schema> + fn table_schema(&self, table_name: &str) -> Option; } /// Predicate that has been "specialized" / normalized for a @@ -204,13 +206,13 @@ pub trait QueryNamespaceMeta { /// ``` fn normalize_predicate( table_name: &str, - schema: Arc, + schema: Schema, predicate: &Predicate, ) -> DataFusionResult { let mut predicate = predicate.clone(); - let mut field_projections = FieldProjectionRewriter::new(Arc::clone(&schema)); - let mut missing_tag_columns = MissingTagColumnRewriter::new(Arc::clone(&schema)); + let mut field_projections = FieldProjectionRewriter::new(schema.clone()); + let mut missing_tag_columns = MissingTagColumnRewriter::new(schema.clone()); let mut field_value_exprs = vec![]; @@ -221,7 +223,7 @@ fn normalize_predicate( .exprs .into_iter() .map(|e| { - let simplifier = ExprSimplifier::new(SimplifyAdapter::new(schema.as_ref())); + let simplifier = ExprSimplifier::new(SimplifyAdapter::new(&schema)); debug!(?e, "rewriting expr"); @@ -349,10 +351,9 @@ mod tests { #[test] fn test_normalize_predicate_coerced() { - let schema = schema(); let predicate = normalize_predicate( "table", - Arc::clone(&schema), + schema(), &Predicate::new().with_expr(col("t1").eq(lit("f1"))), ) .unwrap(); @@ -470,8 +471,8 @@ mod tests { assert_eq!(predicate, expected); } - fn schema() -> Arc { - let schema = schema::builder::SchemaBuilder::new() + fn schema() -> Schema { + schema::builder::SchemaBuilder::new() .tag("t1") .tag("t2") .field("f1", DataType::Int64) @@ -479,9 +480,7 @@ mod tests { .field("f2", DataType::Int64) .unwrap() .build() - .unwrap(); - - Arc::new(schema) + .unwrap() } #[allow(dead_code)] diff --git a/predicate/src/rpc_predicate/column_rewrite.rs b/predicate/src/rpc_predicate/column_rewrite.rs index 2c24c1bbc4..eeed0202bc 100644 --- a/predicate/src/rpc_predicate/column_rewrite.rs +++ b/predicate/src/rpc_predicate/column_rewrite.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use datafusion::{ error::Result as DataFusionResult, logical_expr::expr_rewriter::ExprRewriter, prelude::*, scalar::ScalarValue, @@ -11,12 +9,12 @@ use schema::{InfluxColumnType, Schema}; #[derive(Debug)] pub(crate) struct MissingTagColumnRewriter { /// The input schema - schema: Arc, + schema: Schema, } impl MissingTagColumnRewriter { /// Create a new [`MissingTagColumnRewriter`] targeting the given schema - pub(crate) fn new(schema: Arc) -> Self { + pub(crate) fn new(schema: Schema) -> Self { Self { schema } } @@ -105,7 +103,7 @@ mod tests { .build() .unwrap(); - let mut rewriter = MissingTagColumnRewriter::new(Arc::new(schema)); + let mut rewriter = MissingTagColumnRewriter::new(schema); expr.rewrite(&mut rewriter).unwrap() } } diff --git a/predicate/src/rpc_predicate/field_rewrite.rs b/predicate/src/rpc_predicate/field_rewrite.rs index af3baf2f87..93146955d4 100644 --- a/predicate/src/rpc_predicate/field_rewrite.rs +++ b/predicate/src/rpc_predicate/field_rewrite.rs @@ -45,12 +45,12 @@ pub(crate) struct FieldProjectionRewriter { /// output field_predicates: Vec, /// The input schema (from where we know the field) - schema: Arc, + schema: Schema, } impl FieldProjectionRewriter { /// Create a new [`FieldProjectionRewriter`] targeting the given schema - pub(crate) fn new(schema: Arc) -> Self { + pub(crate) fn new(schema: Schema) -> Self { Self { field_predicates: vec![], schema, @@ -429,7 +429,7 @@ mod tests { "Running test\ninput: {:?}\nexpected_expr: {:?}\nexpected_field_columns: {:?}\n", input, exp_expr, exp_field_columns ); - let mut rewriter = FieldProjectionRewriter::new(Arc::clone(&schema)); + let mut rewriter = FieldProjectionRewriter::new(schema.clone()); let rewritten = rewriter.rewrite_field_exprs(input).unwrap(); assert_eq!(rewritten, exp_expr); @@ -471,9 +471,8 @@ mod tests { input, exp_error ); - let schema = Arc::clone(&schema); - let run_case = move || { - let mut rewriter = FieldProjectionRewriter::new(schema); + let run_case = || { + let mut rewriter = FieldProjectionRewriter::new(schema.clone()); // check for error in rewrite_field_exprs rewriter.rewrite_field_exprs(input)?; // check for error adding to predicate @@ -500,7 +499,7 @@ mod tests { query_functions::regex_not_match_expr(arg, pattern.into()) } - fn make_schema() -> Arc { + fn make_schema() -> Schema { SchemaBuilder::new() .tag("foo") .tag("bar") @@ -513,7 +512,6 @@ mod tests { .field("f4", DataType::Float64) .unwrap() .build() - .map(Arc::new) .unwrap() } } diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs index 8032be7426..7d862c97ed 100644 --- a/querier/src/cache/namespace.rs +++ b/querier/src/cache/namespace.rs @@ -206,7 +206,7 @@ impl NamespaceCache { #[derive(Debug, Clone, PartialEq, Eq)] pub struct CachedTable { pub id: TableId, - pub schema: Arc, + pub schema: Schema, pub column_id_map: HashMap>, pub column_id_map_rev: HashMap, ColumnId>, pub primary_key_column_ids: Vec, @@ -242,7 +242,7 @@ impl From for CachedTable { column_id_map.shrink_to_fit(); let id = table.id; - let schema: Arc = Arc::new(table.try_into().expect("Catalog table schema broken")); + let schema: Schema = table.try_into().expect("Catalog table schema broken"); let mut column_id_map_rev: HashMap, ColumnId> = column_id_map .iter() @@ -368,15 +368,13 @@ mod tests { Arc::from("table1"), Arc::new(CachedTable { id: table11.table.id, - schema: Arc::new( - SchemaBuilder::new() - .field("col1", DataType::Int64) - .unwrap() - .tag("col2") - .timestamp() - .build() - .unwrap(), - ), + schema: SchemaBuilder::new() + .field("col1", DataType::Int64) + .unwrap() + .tag("col2") + .timestamp() + .build() + .unwrap(), column_id_map: HashMap::from([ (col111.column.id, Arc::from(col111.column.name.clone())), (col112.column.id, Arc::from(col112.column.name.clone())), @@ -394,14 +392,12 @@ mod tests { Arc::from("table2"), Arc::new(CachedTable { id: table12.table.id, - schema: Arc::new( - SchemaBuilder::new() - .field("col1", DataType::Float64) - .unwrap() - .timestamp() - .build() - .unwrap(), - ), + schema: SchemaBuilder::new() + .field("col1", DataType::Float64) + .unwrap() + .timestamp() + .build() + .unwrap(), column_id_map: HashMap::from([ (col121.column.id, Arc::from(col121.column.name.clone())), (col122.column.id, Arc::from(col122.column.name.clone())), @@ -433,7 +429,7 @@ mod tests { Arc::from("table1"), Arc::new(CachedTable { id: table21.table.id, - schema: Arc::new(SchemaBuilder::new().timestamp().build().unwrap()), + schema: SchemaBuilder::new().timestamp().build().unwrap(), column_id_map: HashMap::from([( col211.column.id, Arc::from(col211.column.name.clone()), diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 5640db4fbe..c62f92da06 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -532,7 +532,7 @@ mod tests { assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); } - fn schema() -> Arc { - Arc::new(SchemaBuilder::new().build().unwrap()) + fn schema() -> Schema { + SchemaBuilder::new().build().unwrap() } } diff --git a/querier/src/cache/projected_schema.rs b/querier/src/cache/projected_schema.rs index 1408d609db..4213324293 100644 --- a/querier/src/cache/projected_schema.rs +++ b/querier/src/cache/projected_schema.rs @@ -58,7 +58,7 @@ impl CacheKey { type CacheT = Box< dyn Cache< K = CacheKey, - V = Arc, + V = Schema, GetExtra = (Arc, Option), PeekExtra = ((), Option), >, @@ -97,12 +97,10 @@ impl ProjectedSchemaCache { // order by name since IDs are rather arbitrary projection.sort(); - Arc::new( - table - .schema - .select_by_names(&projection) - .expect("Bug in schema projection"), - ) + table + .schema + .select_by_names(&projection) + .expect("Bug in schema projection") }); let loader = Arc::new(MetricsLoader::new( loader, @@ -117,7 +115,7 @@ impl ProjectedSchemaCache { backend.add_policy(LruPolicy::new( Arc::clone(&ram_pool), CACHE_ID, - Arc::new(FunctionEstimator::new(|k: &CacheKey, v: &Arc| { + Arc::new(FunctionEstimator::new(|k: &CacheKey, v: &Schema| { RamSize(k.size() + size_of_val(v) + v.estimate_size()) })), )); @@ -148,7 +146,7 @@ impl ProjectedSchemaCache { table: Arc, projection: Vec, span: Option, - ) -> Arc { + ) -> Schema { let key = CacheKey::new(table.id, projection); self.cache.get(key, (table, span)).await @@ -176,25 +174,21 @@ mod tests { let table_id_1 = TableId::new(1); let table_id_2 = TableId::new(2); - let table_schema_a = Arc::new( - SchemaBuilder::new() - .tag("t1") - .tag("t2") - .tag("t3") - .timestamp() - .build() - .unwrap(), - ); - let table_schema_b = Arc::new( - SchemaBuilder::new() - .tag("t1") - .tag("t2") - .tag("t3") - .tag("t4") - .timestamp() - .build() - .unwrap(), - ); + let table_schema_a = SchemaBuilder::new() + .tag("t1") + .tag("t2") + .tag("t3") + .timestamp() + .build() + .unwrap(); + let table_schema_b = SchemaBuilder::new() + .tag("t1") + .tag("t2") + .tag("t3") + .tag("t4") + .timestamp() + .build() + .unwrap(); let column_id_map_a = HashMap::from([ (ColumnId::new(1), Arc::from("t1")), (ColumnId::new(2), Arc::from("t2")), @@ -210,7 +204,7 @@ mod tests { ]); let table_1a = Arc::new(CachedTable { id: table_id_1, - schema: Arc::clone(&table_schema_a), + schema: table_schema_a.clone(), column_id_map: column_id_map_a.clone(), column_id_map_rev: reverse_map(&column_id_map_a), primary_key_column_ids: vec![ @@ -222,7 +216,7 @@ mod tests { }); let table_1b = Arc::new(CachedTable { id: table_id_1, - schema: Arc::clone(&table_schema_b), + schema: table_schema_b.clone(), column_id_map: column_id_map_b.clone(), column_id_map_rev: reverse_map(&column_id_map_b), primary_key_column_ids: vec![ @@ -234,7 +228,7 @@ mod tests { }); let table_2a = Arc::new(CachedTable { id: table_id_2, - schema: Arc::clone(&table_schema_a), + schema: table_schema_a.clone(), column_id_map: column_id_map_a.clone(), column_id_map_rev: reverse_map(&column_id_map_a), primary_key_column_ids: vec![ @@ -247,7 +241,7 @@ mod tests { }); // initial request - let expected = Arc::new(SchemaBuilder::new().tag("t1").tag("t2").build().unwrap()); + let expected = SchemaBuilder::new().tag("t1").tag("t2").build().unwrap(); let projection_1 = cache .get( Arc::clone(&table_1a), @@ -265,7 +259,7 @@ mod tests { None, ) .await; - assert!(Arc::ptr_eq(&projection_1, &projection_2)); + assert!(Arc::ptr_eq(projection_1.inner(), projection_2.inner())); // updated table schema let projection_3 = cache @@ -275,7 +269,7 @@ mod tests { None, ) .await; - assert!(Arc::ptr_eq(&projection_1, &projection_3)); + assert!(Arc::ptr_eq(projection_1.inner(), projection_3.inner())); // different column order let projection_4 = cache @@ -285,10 +279,10 @@ mod tests { None, ) .await; - assert!(Arc::ptr_eq(&projection_1, &projection_4)); + assert!(Arc::ptr_eq(projection_1.inner(), projection_4.inner())); // different columns set - let expected = Arc::new(SchemaBuilder::new().tag("t1").tag("t3").build().unwrap()); + let expected = SchemaBuilder::new().tag("t1").tag("t3").build().unwrap(); let projection_5 = cache .get( Arc::clone(&table_1a), @@ -307,7 +301,7 @@ mod tests { ) .await; assert_eq!(projection_6, projection_1); - assert!(!Arc::ptr_eq(&projection_1, &projection_6)); + assert!(!Arc::ptr_eq(projection_1.inner(), projection_6.inner())); // original data still present let projection_7 = cache @@ -317,7 +311,7 @@ mod tests { None, ) .await; - assert!(Arc::ptr_eq(&projection_1, &projection_7)); + assert!(Arc::ptr_eq(projection_1.inner(), projection_7.inner())); } fn reverse_map(map: &HashMap) -> HashMap diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 8e5823f826..992cbb30f7 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -617,7 +617,7 @@ impl IngesterStreamDecoder { .take() .expect("Partition should have been checked before chunk creation"); self.current_partition = - Some(current_partition.try_add_chunk(ChunkId::new(), Arc::new(schema), batches)?); + Some(current_partition.try_add_chunk(ChunkId::new(), schema, batches)?); } Ok(()) @@ -1043,7 +1043,7 @@ impl IngesterPartition { pub(crate) fn try_add_chunk( mut self, chunk_id: ChunkId, - expected_schema: Arc, + expected_schema: Schema, batches: Vec, ) -> Result { // ignore chunk if there are no batches @@ -1059,7 +1059,7 @@ impl IngesterPartition { // details) let batches = batches .into_iter() - .map(|batch| ensure_schema(batch, expected_schema.as_ref())) + .map(|batch| ensure_schema(batch, &expected_schema)) .collect::>>()?; // TODO: may want to ask the Ingester to send this value instead of computing it here. @@ -1141,7 +1141,7 @@ impl IngesterPartition { pub struct IngesterChunk { chunk_id: ChunkId, partition_id: PartitionId, - schema: Arc, + schema: Schema, /// Partition-wide sort key. partition_sort_key: Option>, @@ -1198,9 +1198,9 @@ impl QueryChunkMeta for IngesterChunk { Arc::clone(&self.summary) } - fn schema(&self) -> Arc { + fn schema(&self) -> &Schema { trace!(schema=?self.schema, "IngesterChunk schema"); - Arc::clone(&self.schema) + &self.schema } fn partition_sort_key(&self) -> Option<&SortKey> { @@ -2011,16 +2011,14 @@ mod tests { .await } - fn schema() -> Arc { - Arc::new( - SchemaBuilder::new() - .influx_field("bar", InfluxFieldType::Float) - .influx_field("baz", InfluxFieldType::Float) - .influx_field("foo", InfluxFieldType::Float) - .timestamp() - .build() - .unwrap(), - ) + fn schema() -> Schema { + SchemaBuilder::new() + .influx_field("bar", InfluxFieldType::Float) + .influx_field("baz", InfluxFieldType::Float) + .influx_field("foo", InfluxFieldType::Float) + .timestamp() + .build() + .unwrap() } fn lp_to_record_batch(lp: &str) -> RecordBatch { @@ -2165,7 +2163,7 @@ mod tests { #[test] fn test_ingester_partition_type_cast() { - let expected_schema = Arc::new(SchemaBuilder::new().tag("t").timestamp().build().unwrap()); + let expected_schema = SchemaBuilder::new().tag("t").timestamp().build().unwrap(); let cases = vec![ // send a batch that matches the schema exactly @@ -2188,7 +2186,7 @@ mod tests { tombstone_max_sequence_number, None, ) - .try_add_chunk(ChunkId::new(), Arc::clone(&expected_schema), vec![case]) + .try_add_chunk(ChunkId::new(), expected_schema.clone(), vec![case]) .unwrap(); for batch in &ingester_partition.chunks[0].batches { @@ -2199,15 +2197,12 @@ mod tests { #[test] fn test_ingester_partition_fail_type_cast() { - let expected_schema = Arc::new( - SchemaBuilder::new() - .field("b", DataType::Boolean) - .unwrap() - .timestamp() - .build() - .unwrap(), - ); - + let expected_schema = SchemaBuilder::new() + .field("b", DataType::Boolean) + .unwrap() + .timestamp() + .build() + .unwrap(); let batch = RecordBatch::try_from_iter(vec![("b", int64_array()), ("time", ts_array())]).unwrap(); @@ -2223,7 +2218,7 @@ mod tests { tombstone_max_sequence_number, None, ) - .try_add_chunk(ChunkId::new(), Arc::clone(&expected_schema), vec![batch]) + .try_add_chunk(ChunkId::new(), expected_schema, vec![batch]) .unwrap_err(); assert_matches!(err, Error::RecordBatchType { .. }); diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index 3320b46958..7a0c5a7149 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -91,18 +91,17 @@ impl IngesterConnection for MockIngesterConnection { let total_row_count = batches.iter().map(|b| b.num_rows()).sum::() as u64; + let summary = + create_basic_summary(total_row_count, &new_schema, ic.ts_min_max); + super::IngesterChunk { chunk_id: ic.chunk_id, partition_id: ic.partition_id, - schema: Arc::new(new_schema.clone()), + schema: new_schema, partition_sort_key: ic.partition_sort_key, batches, ts_min_max: ic.ts_min_max, - summary: Arc::new(create_basic_summary( - total_row_count, - &new_schema, - ic.ts_min_max, - )), + summary: Arc::new(summary), } }) .collect::>(); diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 9f85b1d57a..c595570a8f 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -72,7 +72,7 @@ impl QuerierNamespace { namespace_retention_period: ns.retention_period, table_id: cached_table.id, table_name: Arc::clone(table_name), - schema: Arc::clone(&cached_table.schema), + schema: cached_table.schema.clone(), ingester_connection: ingester_connection.clone(), chunk_adapter: Arc::clone(&chunk_adapter), exec: Arc::clone(&exec), @@ -184,7 +184,7 @@ mod tests { let qns = querier_namespace(&ns).await; let expected_schema = SchemaBuilder::new().build().unwrap(); let actual_schema = schema(&qns, "table"); - assert_eq!(actual_schema.as_ref(), &expected_schema,); + assert_eq!(actual_schema, &expected_schema,); table.create_column("col1", ColumnType::I64).await; table.create_column("col2", ColumnType::Bool).await; @@ -197,7 +197,7 @@ mod tests { .build() .unwrap(); let actual_schema = schema(&qns, "table"); - assert_eq!(actual_schema.as_ref(), &expected_schema,); + assert_eq!(actual_schema, &expected_schema); table.create_column("col4", ColumnType::Tag).await; table @@ -213,7 +213,7 @@ mod tests { .build() .unwrap(); let actual_schema = schema(&qns, "table"); - assert_eq!(actual_schema.as_ref(), &expected_schema,); + assert_eq!(actual_schema, &expected_schema); } fn sorted(mut v: Vec) -> Vec @@ -234,7 +234,7 @@ mod tests { ) } - fn schema(querier_namespace: &QuerierNamespace, table: &str) -> Arc { - Arc::clone(querier_namespace.tables.get(table).unwrap().schema()) + fn schema<'a>(querier_namespace: &'a QuerierNamespace, table: &str) -> &'a Schema { + querier_namespace.tables.get(table).unwrap().schema() } } diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index 0cb71ce49a..da8b76aed1 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -31,8 +31,8 @@ impl QueryNamespaceMeta for QuerierNamespace { names } - fn table_schema(&self, table_name: &str) -> Option> { - self.tables.get(table_name).map(|t| Arc::clone(t.schema())) + fn table_schema(&self, table_name: &str) -> Option { + self.tables.get(table_name).map(|t| t.schema().clone()) } } diff --git a/querier/src/parquet/creation.rs b/querier/src/parquet/creation.rs index f3ed8cbdb5..3bbe8d3248 100644 --- a/querier/src/parquet/creation.rs +++ b/querier/src/parquet/creation.rs @@ -84,11 +84,7 @@ impl ChunkAdapter { .collect(); // Prune on the most basic summary data (timestamps and column names) before trying to fully load the chunks - let keeps = match prune_summaries( - Arc::clone(&cached_table.schema), - &basic_summaries, - predicate, - ) { + let keeps = match prune_summaries(&cached_table.schema, &basic_summaries, predicate) { Ok(keeps) => keeps, Err(reason) => { // Ignore pruning failures here - the chunk pruner should have already logged them. diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index 3544f24055..585bb4f5f8 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -6,7 +6,7 @@ use data_types::{ }; use iox_query::util::create_basic_summary; use parquet_file::chunk::ParquetChunk; -use schema::{sort::SortKey, Schema}; +use schema::sort::SortKey; use std::sync::Arc; mod creation; @@ -84,9 +84,6 @@ pub struct QuerierParquetChunk { /// Immutable chunk metadata meta: Arc, - /// Schema of the chunk - schema: Arc, - /// Delete predicates to be combined with the chunk delete_predicates: Vec>, @@ -107,11 +104,9 @@ impl QuerierParquetChunk { meta: Arc, partition_sort_key: Option>, ) -> Self { - let schema = parquet_chunk.schema(); - let table_summary = Arc::new(create_basic_summary( parquet_chunk.rows() as u64, - &parquet_chunk.schema(), + parquet_chunk.schema(), parquet_chunk.timestamp_min_max(), )); @@ -119,7 +114,6 @@ impl QuerierParquetChunk { meta, delete_predicates: Vec::new(), partition_sort_key, - schema, parquet_chunk, table_summary, } @@ -341,7 +335,7 @@ pub mod tests { .build() .unwrap(); let actual_schema = chunk.schema(); - assert_eq!(actual_schema.as_ref(), &expected_schema); + assert_eq!(actual_schema, &expected_schema); } fn assert_sort_key(chunk: &QuerierParquetChunk) { diff --git a/querier/src/parquet/query_access.rs b/querier/src/parquet/query_access.rs index 4a2b89c3fa..758c7dc41c 100644 --- a/querier/src/parquet/query_access.rs +++ b/querier/src/parquet/query_access.rs @@ -14,8 +14,8 @@ impl QueryChunkMeta for QuerierParquetChunk { Arc::clone(&self.table_summary) } - fn schema(&self) -> Arc { - Arc::clone(&self.schema) + fn schema(&self) -> &Schema { + self.parquet_chunk.schema() } fn partition_sort_key(&self) -> Option<&SortKey> { diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index bd923c28ed..e40cd5285b 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -76,7 +76,7 @@ pub struct QuerierTableArgs { pub namespace_retention_period: Option, pub table_id: TableId, pub table_name: Arc, - pub schema: Arc, + pub schema: Schema, pub ingester_connection: Option>, pub chunk_adapter: Arc, pub exec: Arc, @@ -106,7 +106,7 @@ pub struct QuerierTable { table_id: TableId, /// Table schema. - schema: Arc, + schema: Schema, /// Connection to ingester ingester_connection: Option>, @@ -165,7 +165,7 @@ impl QuerierTable { } /// Schema. - pub fn schema(&self) -> &Arc { + pub fn schema(&self) -> &Schema { &self.schema } @@ -372,7 +372,7 @@ impl QuerierTable { .prune_chunks( self.table_name(), // use up-to-date schema - Arc::clone(&cached_table.schema), + &cached_table.schema, chunks, &predicate, ) @@ -848,7 +848,7 @@ mod tests { let schema = make_schema_two_fields_two_tags(&table).await; // let add a partion from the ingester - let builder = IngesterPartitionBuilder::new(&schema, &shard, &partition) + let builder = IngesterPartitionBuilder::new(schema, &shard, &partition) .with_lp(["table,tag1=val1,tag2=val2 foo=3,bar=4 11"]); let ingester_partition = @@ -945,7 +945,7 @@ mod tests { .with_compaction_level(CompactionLevel::FileNonOverlapped); partition.create_parquet_file(builder).await; - let builder = IngesterPartitionBuilder::new(&schema, &shard, &partition); + let builder = IngesterPartitionBuilder::new(schema, &shard, &partition); let ingester_partition = builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(1))); @@ -1015,18 +1015,16 @@ mod tests { .create_tombstone(12, 1, 100, "foo=3") .await; - let schema = Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Integer) - .timestamp() - .build() - .unwrap(), - ); + let schema = SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Integer) + .timestamp() + .build() + .unwrap(); let ingester_chunk_id1 = u128::MAX - 1; - let builder1 = IngesterPartitionBuilder::new(&schema, &shard, &partition1); - let builder2 = IngesterPartitionBuilder::new(&schema, &shard, &partition2); + let builder1 = IngesterPartitionBuilder::new(schema.clone(), &shard, &partition1); + let builder2 = IngesterPartitionBuilder::new(schema, &shard, &partition2); let querier_table = TestQuerierTable::new(&catalog, &table) .await .with_ingester_partition( @@ -1111,16 +1109,14 @@ mod tests { let partition1 = table.with_shard(&shard).create_partition("k1").await; let partition2 = table.with_shard(&shard).create_partition("k2").await; - let schema = Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Integer) - .timestamp() - .build() - .unwrap(), - ); + let schema = SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Integer) + .timestamp() + .build() + .unwrap(); - let builder1 = IngesterPartitionBuilder::new(&schema, &shard, &partition1); - let builder2 = IngesterPartitionBuilder::new(&schema, &shard, &partition2); + let builder1 = IngesterPartitionBuilder::new(schema.clone(), &shard, &partition1); + let builder2 = IngesterPartitionBuilder::new(schema, &shard, &partition2); let querier_table = TestQuerierTable::new(&catalog, &table) .await @@ -1172,7 +1168,7 @@ mod tests { let schema = make_schema(&table).await; let builder = - IngesterPartitionBuilder::new(&schema, &shard, &partition).with_lp(["table foo=1 1"]); + IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]); // Parquet file between with max sequence number 2 let pf_builder = TestParquetFileBuilder::default() @@ -1230,7 +1226,7 @@ mod tests { let querier_table = TestQuerierTable::new(&catalog, &table).await; let builder = - IngesterPartitionBuilder::new(&schema, &shard, &partition).with_lp(["table foo=1 1"]); + IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]); // parquet file with max sequence number 1 let pf_builder = TestParquetFileBuilder::default() @@ -1292,7 +1288,7 @@ mod tests { let querier_table = TestQuerierTable::new(&catalog, &table).await; let builder = - IngesterPartitionBuilder::new(&schema, &shard, &partition).with_lp(["table foo=1 1"]); + IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]); // parquet file with max sequence number 1 let pf_builder = TestParquetFileBuilder::default() @@ -1319,20 +1315,18 @@ mod tests { } /// Adds a "foo" column to the table and returns the created schema - async fn make_schema(table: &Arc) -> Arc { + async fn make_schema(table: &Arc) -> Schema { table.create_column("foo", ColumnType::F64).await; table.create_column("time", ColumnType::Time).await; // create corresponding schema - Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Float) - .timestamp() - .build() - .unwrap(), - ) + SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Float) + .timestamp() + .build() + .unwrap() } - async fn make_schema_two_fields_two_tags(table: &Arc) -> Arc { + async fn make_schema_two_fields_two_tags(table: &Arc) -> Schema { table.create_column("time", ColumnType::Time).await; table.create_column("foo", ColumnType::F64).await; table.create_column("bar", ColumnType::F64).await; @@ -1340,16 +1334,14 @@ mod tests { table.create_column("tag2", ColumnType::Tag).await; // create corresponding schema - Arc::new( - SchemaBuilder::new() - .influx_field("foo", InfluxFieldType::Float) - .influx_field("bar", InfluxFieldType::Float) - .tag("tag1") - .tag("tag2") - .timestamp() - .build() - .unwrap(), - ) + SchemaBuilder::new() + .influx_field("foo", InfluxFieldType::Float) + .influx_field("bar", InfluxFieldType::Float) + .tag("tag1") + .tag("tag2") + .timestamp() + .build() + .unwrap() } /// A `QuerierTable` and some number of `IngesterPartitions` that diff --git a/querier/src/table/query_access/mod.rs b/querier/src/table/query_access/mod.rs index 80933e7a37..4ca1f2d0d4 100644 --- a/querier/src/table/query_access/mod.rs +++ b/querier/src/table/query_access/mod.rs @@ -54,7 +54,7 @@ impl TableProvider for QuerierTable { let mut builder = ProviderBuilder::new( Arc::clone(self.table_name()), - Arc::clone(self.schema()), + self.schema().clone(), iox_ctx, ); @@ -108,7 +108,7 @@ impl ChunkPruner for QuerierTableChunkPruner { fn prune_chunks( &self, _table_name: &str, - table_schema: Arc, + table_schema: &Schema, chunks: Vec>, predicate: &Predicate, ) -> Result>, ProviderError> { diff --git a/querier/src/table/test_util.rs b/querier/src/table/test_util.rs index e5cf1daedf..a20f5217a5 100644 --- a/querier/src/table/test_util.rs +++ b/querier/src/table/test_util.rs @@ -29,7 +29,7 @@ pub async fn querier_table(catalog: &Arc, table: &Arc) - .await .unwrap(); let schema = catalog_schema.tables.remove(&table.table.name).unwrap(); - let schema = Arc::new(Schema::try_from(schema).unwrap()); + let schema = Schema::try_from(schema).unwrap(); let namespace_name = Arc::from(table.namespace.namespace.name.as_str()); @@ -63,7 +63,7 @@ pub(crate) fn lp_to_record_batch(lp: &str) -> RecordBatch { /// Helper for creating IngesterPartitions #[derive(Debug, Clone)] pub(crate) struct IngesterPartitionBuilder { - schema: Arc, + schema: Schema, shard: Arc, partition: Arc, ingester_name: Arc, @@ -77,12 +77,12 @@ pub(crate) struct IngesterPartitionBuilder { impl IngesterPartitionBuilder { pub(crate) fn new( - schema: &Arc, + schema: Schema, shard: &Arc, partition: &Arc, ) -> Self { Self { - schema: Arc::clone(schema), + schema, shard: Arc::clone(shard), partition: Arc::clone(partition), ingester_name: Arc::from("ingester1"), @@ -135,7 +135,7 @@ impl IngesterPartitionBuilder { ) .try_add_chunk( ChunkId::new_test(self.ingester_chunk_id), - Arc::clone(&self.schema), + self.schema.clone(), data, ) .unwrap() diff --git a/schema/src/interner.rs b/schema/src/interner.rs index c78f2604a3..c1895e6ea6 100644 --- a/schema/src/interner.rs +++ b/schema/src/interner.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, sync::Arc}; +use std::collections::HashSet; use crate::Schema; @@ -10,7 +10,7 @@ use crate::Schema; /// [Interning]: https://en.wikipedia.org/wiki/Interning_(computer_science) #[derive(Debug, Default)] pub struct SchemaInterner { - schemas: HashSet>, + schemas: HashSet, } impl SchemaInterner { @@ -20,12 +20,11 @@ impl SchemaInterner { } /// Intern schema. - pub fn intern(&mut self, schema: Schema) -> Arc { + pub fn intern(&mut self, schema: Schema) -> Schema { if let Some(schema) = self.schemas.get(&schema) { - Arc::clone(schema) + schema.clone() } else { - let schema = Arc::new(schema); - self.schemas.insert(Arc::clone(&schema)); + self.schemas.insert(schema.clone()); schema } } @@ -34,6 +33,7 @@ impl SchemaInterner { #[cfg(test)] mod tests { use crate::builder::SchemaBuilder; + use std::sync::Arc; use super::*; @@ -46,12 +46,12 @@ mod tests { let schema_2 = SchemaBuilder::new().tag("t1").tag("t3").build().unwrap(); let interned_1a = interner.intern(schema_1a.clone()); - assert_eq!(interned_1a.as_ref(), &schema_1a); + assert_eq!(interned_1a, schema_1a); let interned_1b = interner.intern(schema_1b); - assert!(Arc::ptr_eq(&interned_1a, &interned_1b)); + assert!(Arc::ptr_eq(interned_1a.inner(), interned_1b.inner())); let interned_2 = interner.intern(schema_2.clone()); - assert_eq!(interned_2.as_ref(), &schema_2); + assert_eq!(interned_2, schema_2); } } diff --git a/schema/src/merge.rs b/schema/src/merge.rs index 8b718da069..139258e941 100644 --- a/schema/src/merge.rs +++ b/schema/src/merge.rs @@ -44,7 +44,7 @@ pub type Result = std::result::Result; /// This is infallable because the schemas of chunks within a /// partition are assumed to be compatible because that schema was /// enforced as part of writing into the partition -pub fn merge_record_batch_schemas(batches: &[Arc]) -> Arc { +pub fn merge_record_batch_schemas(batches: &[Arc]) -> Schema { let mut merger = SchemaMerger::new(); for batch in batches { let schema = Schema::try_from(batch.schema()).expect("Schema conversion error"); @@ -154,7 +154,7 @@ impl<'a> SchemaMerger<'a> { } /// Returns the schema that was built, the columns are always sorted in lexicographic order - pub fn build(mut self) -> Arc { + pub fn build(mut self) -> Schema { let schema = Schema::new_from_parts( self.measurement.take(), self.fields.drain().map(|x| x.1), @@ -165,7 +165,7 @@ impl<'a> SchemaMerger<'a> { if let Some(interner) = self.interner.as_mut() { interner.intern(schema) } else { - Arc::new(schema) + schema } } } @@ -198,8 +198,8 @@ mod tests { .unwrap() .build(); - assert_eq!(merged_schema.as_ref(), &schema1); - assert_eq!(merged_schema.as_ref(), &schema2); + assert_eq!(merged_schema, schema1); + assert_eq!(merged_schema, schema2); } #[test] @@ -239,11 +239,9 @@ mod tests { .sort_fields_by_name(); assert_eq!( - &expected_schema, - merged_schema.as_ref(), + expected_schema, merged_schema, "\nExpected:\n{:#?}\nActual:\n{:#?}", - expected_schema, - merged_schema + expected_schema, merged_schema ); } @@ -269,11 +267,9 @@ mod tests { .unwrap(); assert_eq!( - &expected_schema, - merged_schema.as_ref(), + expected_schema, merged_schema, "\nExpected:\n{:#?}\nActual:\n{:#?}", - expected_schema, - merged_schema + expected_schema, merged_schema ); } @@ -303,11 +299,9 @@ mod tests { .unwrap(); assert_eq!( - &expected_schema, - merged_schema.as_ref(), + expected_schema, merged_schema, "\nExpected:\n{:#?}\nActual:\n{:#?}", - expected_schema, - merged_schema + expected_schema, merged_schema ); } @@ -335,11 +329,9 @@ mod tests { .unwrap(); assert_eq!( - &expected_schema, - merged_schema.as_ref(), + expected_schema, merged_schema, "\nExpected:\n{:#?}\nActual:\n{:#?}", - expected_schema, - merged_schema + expected_schema, merged_schema ); } @@ -428,6 +420,9 @@ mod tests { .build(); assert_eq!(merged_schema_a, merged_schema_b); - assert!(Arc::ptr_eq(&merged_schema_a, &merged_schema_b)); + assert!(Arc::ptr_eq( + merged_schema_a.inner(), + merged_schema_b.inner() + )); } } diff --git a/service_grpc_influxrpc/src/expr.rs b/service_grpc_influxrpc/src/expr.rs index 7bab82bb08..e343e77f79 100644 --- a/service_grpc_influxrpc/src/expr.rs +++ b/service_grpc_influxrpc/src/expr.rs @@ -894,7 +894,7 @@ mod tests { use generated_types::node::Type as RPCNodeType; use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate}; use schema::{Schema, SchemaBuilder}; - use std::{collections::BTreeSet, sync::Arc}; + use std::collections::BTreeSet; use test_helpers::assert_contains; use super::*; @@ -915,7 +915,7 @@ mod tests { self.table_names.clone() } - fn table_schema(&self, table_name: &str) -> Option> { + fn table_schema(&self, table_name: &str) -> Option { match table_name { "foo" => { let schema = SchemaBuilder::new() @@ -929,7 +929,7 @@ mod tests { .build() .unwrap(); - Some(Arc::new(schema)) + Some(schema) } "bar" => { let schema = SchemaBuilder::new() @@ -939,7 +939,7 @@ mod tests { .build() .unwrap(); - Some(Arc::new(schema)) + Some(schema) } _ => None, }