Merge branch 'main' into alamb-patch-1

pull/24376/head
kodiakhq[bot] 2023-01-06 11:16:52 +00:00 committed by GitHub
commit 9a0db0ec30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 413 additions and 479 deletions

View File

@ -356,7 +356,7 @@ impl CompactPlanBuilder {
ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
.compact_plan(
Arc::from(partition.table.name.clone()),
Arc::clone(&merged_schema),
&merged_schema,
query_chunks,
sort_key.clone(),
)
@ -384,7 +384,7 @@ impl CompactPlanBuilder {
ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
.compact_plan(
Arc::from(partition.table.name.clone()),
Arc::clone(&merged_schema),
&merged_schema,
query_chunks,
sort_key.clone(),
)
@ -394,7 +394,7 @@ impl CompactPlanBuilder {
ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
.split_plan(
Arc::from(partition.table.name.clone()),
Arc::clone(&merged_schema),
&merged_schema,
query_chunks,
sort_key.clone(),
split_times,
@ -537,7 +537,7 @@ impl CompactPlanBuilder {
let plan = ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
.compact_plan(
Arc::from(partition.table.name.clone()),
Arc::clone(&merged_schema),
&merged_schema,
query_chunks,
sort_key.clone(),
)
@ -769,7 +769,7 @@ fn to_queryable_parquet_chunk(
.map(|sk| sk.filter_to(&pk, file.partition_id().get()));
let file = Arc::new(ParquetFile::from(file));
let parquet_chunk = ParquetChunk::new(Arc::clone(&file), Arc::new(schema), store);
let parquet_chunk = ParquetChunk::new(Arc::clone(&file), schema, store);
trace!(
parquet_file_id=?file.id,

View File

@ -60,7 +60,7 @@ impl QueryableParquetChunk {
let delete_predicates = tombstones_to_delete_predicates(deletes);
let summary = Arc::new(create_basic_summary(
data.rows() as u64,
&data.schema(),
data.schema(),
data.timestamp_min_max(),
));
Self {
@ -79,10 +79,10 @@ impl QueryableParquetChunk {
}
/// Merge schema of the given chunks
pub fn merge_schemas(chunks: &[Arc<dyn QueryChunk>]) -> Arc<Schema> {
pub fn merge_schemas(chunks: &[Arc<dyn QueryChunk>]) -> Schema {
let mut merger = SchemaMerger::new();
for chunk in chunks {
merger = merger.merge(&chunk.schema()).expect("schemas compatible");
merger = merger.merge(chunk.schema()).expect("schemas compatible");
}
merger.build()
}
@ -113,7 +113,7 @@ impl QueryChunkMeta for QueryableParquetChunk {
Arc::clone(&self.summary)
}
fn schema(&self) -> Arc<Schema> {
fn schema(&self) -> &Schema {
self.data.schema()
}
@ -259,7 +259,7 @@ mod tests {
let parquet_chunk = Arc::new(ParquetChunk::new(
Arc::clone(&parquet_file),
Arc::new(table.schema().await),
table.schema().await,
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
));

View File

@ -176,7 +176,7 @@ impl ParquetFileWithTombstone {
.as_ref()
.map(|sk| sk.filter_to(&pk, self.partition_id.get()));
let parquet_chunk = ParquetChunk::new(Arc::clone(&self.data), Arc::new(schema), store);
let parquet_chunk = ParquetChunk::new(Arc::clone(&self.data), schema, store);
trace!(
parquet_file_id=?self.id,

View File

@ -227,8 +227,8 @@ where
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
@ -243,13 +243,13 @@ where
}
/// Create a SendableRecordBatchStream a RecordBatch
pub fn stream_from_batch(schema: Arc<Schema>, batch: RecordBatch) -> SendableRecordBatchStream {
pub fn stream_from_batch(schema: SchemaRef, batch: RecordBatch) -> SendableRecordBatchStream {
stream_from_batches(schema, vec![Arc::new(batch)])
}
/// Create a SendableRecordBatchStream from Vec of RecordBatches with the same schema
pub fn stream_from_batches(
schema: Arc<Schema>,
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
) -> SendableRecordBatchStream {
if batches.is_empty() {

View File

@ -109,7 +109,7 @@ pub(crate) async fn compact_persisting_batch(
}
None => {
let sort_key = compute_sort_key(
batch.schema().as_ref(),
batch.schema(),
batch.record_batches().iter().map(|sb| sb.as_ref()),
);
// Use the sort key computed from the cardinality as the sort key for this parquet
@ -141,7 +141,7 @@ pub(crate) async fn compact(
.compact_plan(
table_name.into(),
data.schema(),
[data as Arc<dyn QueryChunk>],
[Arc::clone(&data) as Arc<dyn QueryChunk>],
sort_key,
)
.context(LogicalPlanSnafu {})?;
@ -525,7 +525,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -566,7 +566,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -612,7 +612,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -658,7 +658,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact
@ -708,7 +708,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact

View File

@ -60,7 +60,7 @@ pub(crate) struct QueryAdaptor {
id: ChunkId,
/// An interned schema for all [`RecordBatch`] in data.
schema: OnceCell<Arc<Schema>>,
schema: Schema,
/// An interned table summary.
summary: OnceCell<Arc<TableSummary>>,
@ -80,13 +80,14 @@ impl QueryAdaptor {
// partitions - if there is a QueryAdaptor, it contains data.
assert!(data.iter().map(|b| b.num_rows()).sum::<usize>() > 0);
let schema = merge_record_batch_schemas(&data);
Self {
schema,
data,
partition_id,
// To return a value for debugging and make it consistent with ChunkId created in Compactor,
// use Uuid for this. Draw this UUID during chunk generation so that it is stable during the whole query process.
id: ChunkId::new(),
schema: OnceCell::default(),
summary: OnceCell::default(),
}
}
@ -137,17 +138,14 @@ impl QueryChunkMeta for QueryAdaptor {
Arc::new(create_basic_summary(
self.data.iter().map(|b| b.num_rows()).sum::<usize>() as u64,
&self.schema(),
self.schema(),
ts_min_max,
))
}))
}
fn schema(&self) -> Arc<Schema> {
Arc::clone(
self.schema
.get_or_init(|| merge_record_batch_schemas(&self.data)),
)
fn schema(&self) -> &Schema {
&self.schema
}
fn partition_sort_key(&self) -> Option<&SortKey> {

View File

@ -68,7 +68,7 @@ pub(super) async fn compact_persisting_batch(
}
None => {
let sort_key = compute_sort_key(
batch.schema().as_ref(),
batch.schema(),
batch.record_batches().iter().map(|sb| sb.as_ref()),
);
// Use the sort key computed from the cardinality as the sort key for this parquet
@ -85,7 +85,7 @@ pub(super) async fn compact_persisting_batch(
.compact_plan(
table_name.into(),
batch.schema(),
[batch as Arc<dyn QueryChunk>],
[Arc::clone(&batch) as Arc<dyn QueryChunk>],
data_sort_key.clone(),
)
.unwrap();
@ -460,7 +460,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -501,7 +501,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -547,7 +547,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "time"]));
// compact
@ -594,7 +594,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact
@ -645,7 +645,7 @@ mod tests {
assert_eq!(expected_pk, pk);
let sort_key =
compute_sort_key(&schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
compute_sort_key(schema, batch.record_batches().iter().map(|rb| rb.as_ref()));
assert_eq!(sort_key, SortKey::from_columns(["tag1", "tag2", "time"]));
// compact

View File

@ -339,7 +339,7 @@ impl PersistQueue for PersistHandle {
};
// Build the persist task request.
let schema = data.schema();
let schema = data.schema().clone();
let (r, notify) = PersistRequest::new(Arc::clone(&partition), data, permit, enqueued_at);
match sort_key {

View File

@ -39,7 +39,7 @@ pub struct QueryAdaptor {
id: ChunkId,
/// An interned schema for all [`RecordBatch`] in data.
schema: OnceCell<Arc<Schema>>,
schema: Schema,
/// An interned table summary.
summary: OnceCell<Arc<TableSummary>>,
@ -59,13 +59,14 @@ impl QueryAdaptor {
// partitions - if there is a QueryAdaptor, it contains data.
assert!(data.iter().map(|b| b.num_rows()).sum::<usize>() > 0);
let schema = merge_record_batch_schemas(&data);
Self {
data,
partition_id,
// To return a value for debugging and make it consistent with ChunkId created in Compactor,
// use Uuid for this. Draw this UUID during chunk generation so that it is stable during the whole query process.
id: ChunkId::new(),
schema: OnceCell::default(),
schema,
summary: OnceCell::default(),
}
}
@ -116,17 +117,14 @@ impl QueryChunkMeta for QueryAdaptor {
Arc::new(create_basic_summary(
self.data.iter().map(|b| b.num_rows()).sum::<usize>() as u64,
&self.schema(),
self.schema(),
ts_min_max,
))
}))
}
fn schema(&self) -> Arc<Schema> {
Arc::clone(
self.schema
.get_or_init(|| merge_record_batch_schemas(&self.data)),
)
fn schema(&self) -> &Schema {
&self.schema
}
fn partition_sort_key(&self) -> Option<&SortKey> {

View File

@ -1,3 +1,4 @@
use arrow::datatypes::SchemaRef;
/// Prototype "Flight Client" that handles underlying details of the flight protocol at a higher level
/// Based on the "low level client" from IOx client:
@ -412,7 +413,7 @@ impl futures::Stream for FlightDataStream {
/// streaming flight response.
#[derive(Debug)]
struct FlightStreamState {
schema: Arc<Schema>,
schema: SchemaRef,
dictionaries_by_field: HashMap<i64, ArrayRef>,
}
@ -431,7 +432,7 @@ impl DecodedFlightData {
}
}
pub fn new_schema(inner: FlightData, schema: Arc<Schema>) -> Self {
pub fn new_schema(inner: FlightData, schema: SchemaRef) -> Self {
Self {
inner,
payload: DecodedPayload::Schema(schema),
@ -458,7 +459,7 @@ pub enum DecodedPayload {
None,
/// A decoded Schema message
Schema(Arc<Schema>),
Schema(SchemaRef),
/// A decoded Record batch.
RecordBatch(RecordBatch),

View File

@ -66,7 +66,7 @@ mod test {
let ctx = IOxSessionContext::with_testing();
// Build a logical plan with deduplication
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), schema, ctx.child_ctx("scan_plan"))
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema, ctx.child_ctx("scan_plan"))
.with_chunks(chunks)
.build()
.unwrap();
@ -114,7 +114,7 @@ mod test {
let ctx = IOxSessionContext::with_testing();
// Build a logical plan without deduplication
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), schema, ctx.child_ctx("scan_plan"))
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema, ctx.child_ctx("scan_plan"))
.with_chunks(chunks)
// force it to not deduplicate
.enable_deduplication(false)
@ -178,7 +178,7 @@ mod test {
let ctx = IOxSessionContext::with_testing();
// Build a logical plan without deduplication but sort
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), schema, ctx.child_ctx("scan_plan"))
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema, ctx.child_ctx("scan_plan"))
.with_chunks(chunks)
// force it to not deduplicate
.enable_deduplication(false)
@ -230,7 +230,7 @@ mod test {
// Use a split plan as it has StreamSplitExec, DeduplicateExec and IOxReadFilternode
let split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
.split_plan(Arc::from("t"), schema, chunks, sort_key, vec![1000])
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000])
.expect("created compact plan");
let executor = Executor::new_testing();
@ -361,7 +361,7 @@ mod test {
extractor.inner
}
fn test_chunks(overlapped: bool) -> (Arc<Schema>, Vec<Arc<dyn QueryChunk>>) {
fn test_chunks(overlapped: bool) -> (Schema, Vec<Arc<dyn QueryChunk>>) {
let max_time = if overlapped { 70000 } else { 7000 };
let chunk1 = Arc::new(
TestChunk::new("t")
@ -385,20 +385,20 @@ mod test {
);
let schema = SchemaMerger::new()
.merge(&chunk1.schema())
.merge(chunk1.schema())
.unwrap()
.merge(&chunk2.schema())
.merge(chunk2.schema())
.unwrap()
.build();
(schema, vec![chunk1, chunk2])
}
fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<dyn QueryChunk>>) {
fn get_test_chunks() -> (Schema, Vec<Arc<dyn QueryChunk>>) {
test_chunks(false)
}
fn get_test_overlapped_chunks() -> (Arc<Schema>, Vec<Arc<dyn QueryChunk>>) {
fn get_test_overlapped_chunks() -> (Schema, Vec<Arc<dyn QueryChunk>>) {
test_chunks(true)
}
}

View File

@ -63,7 +63,7 @@ impl std::fmt::Debug for ScanPlan {
impl ScanPlan {
/// Return the schema of the source (the merged schema across all tables)
pub fn schema(&self) -> Arc<Schema> {
pub fn schema(&self) -> &Schema {
self.provider.iox_schema()
}
}
@ -90,7 +90,7 @@ pub struct ScanPlanBuilder<'a> {
table_name: Arc<str>,
/// The schema of the resulting table (any chunks that don't have
/// all the necessary columns will be extended appropriately)
table_schema: Arc<Schema>,
table_schema: &'a Schema,
chunks: Vec<Arc<dyn QueryChunk>>,
/// The sort key that describes the desired output sort order
output_sort_key: Option<SortKey>,
@ -100,7 +100,7 @@ pub struct ScanPlanBuilder<'a> {
}
impl<'a> ScanPlanBuilder<'a> {
pub fn new(table_name: Arc<str>, table_schema: Arc<Schema>, ctx: IOxSessionContext) -> Self {
pub fn new(table_name: Arc<str>, table_schema: &'a Schema, ctx: IOxSessionContext) -> Self {
Self {
ctx,
table_name,
@ -158,7 +158,7 @@ impl<'a> ScanPlanBuilder<'a> {
// Prepare the plan for the table
let mut builder = ProviderBuilder::new(
Arc::clone(&table_name),
table_schema,
table_schema.clone(),
ctx.child_ctx("provider_builder"),
)
.with_enable_deduplication(deduplication);
@ -193,7 +193,7 @@ impl<'a> ScanPlanBuilder<'a> {
// Rewrite expression so it only refers to columns in this chunk
let schema = provider.iox_schema();
trace!(%table_name, ?filter_expr, "Adding filter expr");
let mut rewriter = MissingColumnsToNull::new(&schema);
let mut rewriter = MissingColumnsToNull::new(schema);
let filter_expr =
filter_expr
.rewrite(&mut rewriter)

View File

@ -309,7 +309,7 @@ impl InfluxRpcPlanner {
let plan = Self::table_name_plan(
ctx,
Arc::clone(table_name),
schema,
&schema,
predicate,
chunks,
)?;
@ -472,7 +472,7 @@ impl InfluxRpcPlanner {
let mut ctx = ctx.child_ctx("tag_keys_plan");
ctx.set_metadata("table", table_name.to_string());
let plan = self.tag_keys_plan(ctx, table_name, schema, predicate, chunks_full)?;
let plan = self.tag_keys_plan(ctx, table_name, &schema, predicate, chunks_full)?;
if let Some(plan) = plan {
builder = builder.append_other(plan)
@ -641,7 +641,7 @@ impl InfluxRpcPlanner {
let mut ctx = ctx.child_ctx("scan_and_filter planning");
ctx.set_metadata("table", table_name.to_string());
let scan_and_filter = ScanPlanBuilder::new(Arc::clone(table_name), schema, ctx)
let scan_and_filter = ScanPlanBuilder::new(Arc::clone(table_name), &schema, ctx)
.with_chunks(chunks_full)
.with_predicate(predicate)
.build()?;
@ -884,7 +884,7 @@ impl InfluxRpcPlanner {
Aggregate::None => Self::read_filter_plan(
ctx.child_ctx("read_filter plan"),
table_name,
Arc::clone(&schema),
schema,
predicate,
chunks,
),
@ -963,7 +963,7 @@ impl InfluxRpcPlanner {
&self,
ctx: IOxSessionContext,
table_name: &str,
schema: Arc<Schema>,
schema: &Schema,
predicate: &Predicate,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Option<StringSetPlan>> {
@ -1029,7 +1029,7 @@ impl InfluxRpcPlanner {
fn field_columns_plan(
ctx: IOxSessionContext,
table_name: Arc<str>,
schema: Arc<Schema>,
schema: &Schema,
predicate: &Predicate,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<LogicalPlan> {
@ -1085,7 +1085,7 @@ impl InfluxRpcPlanner {
fn table_name_plan(
ctx: IOxSessionContext,
table_name: Arc<str>,
schema: Arc<Schema>,
schema: &Schema,
predicate: &Predicate,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<LogicalPlan> {
@ -1100,7 +1100,7 @@ impl InfluxRpcPlanner {
.build()?;
// Select only fields requested
let select_exprs: Vec<_> = filtered_fields_iter(&scan_and_filter.schema(), predicate)
let select_exprs: Vec<_> = filtered_fields_iter(scan_and_filter.schema(), predicate)
.map(|field| field.name.as_expr())
.collect();
@ -1130,7 +1130,7 @@ impl InfluxRpcPlanner {
fn read_filter_plan(
ctx: IOxSessionContext,
table_name: &str,
schema: Arc<Schema>,
schema: &Schema,
predicate: &Predicate,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<SeriesSetPlan> {
@ -1143,7 +1143,7 @@ impl InfluxRpcPlanner {
.with_chunks(chunks)
.build()?;
let schema = scan_and_filter.schema();
let schema = scan_and_filter.provider.iox_schema();
let tags_and_timestamp: Vec<_> = scan_and_filter
.schema()
@ -1164,7 +1164,7 @@ impl InfluxRpcPlanner {
let tags_fields_and_timestamps: Vec<Expr> = schema
.tags_iter()
.map(|field| field.name().as_expr())
.chain(filtered_fields_iter(&schema, predicate).map(|f| f.expr))
.chain(filtered_fields_iter(schema, predicate).map(|f| f.expr))
.chain(schema.time_iter().map(|field| field.name().as_expr()))
.collect();
@ -1179,7 +1179,7 @@ impl InfluxRpcPlanner {
.map(|field| Arc::from(field.name().as_str()))
.collect();
let field_columns = filtered_fields_iter(&schema, predicate)
let field_columns = filtered_fields_iter(schema, predicate)
.map(|field| Arc::from(field.name))
.collect();
@ -1238,7 +1238,7 @@ impl InfluxRpcPlanner {
fn read_group_plan(
ctx: IOxSessionContext,
table_name: &str,
schema: Arc<Schema>,
schema: &Schema,
predicate: &Predicate,
agg: Aggregate,
chunks: Vec<Arc<dyn QueryChunk>>,
@ -1255,7 +1255,7 @@ impl InfluxRpcPlanner {
// order the tag columns so that the group keys come first (we
// will group and
// order in the same order)
let schema = scan_and_filter.schema();
let schema = scan_and_filter.provider.iox_schema();
let tag_columns: Vec<_> = schema.tags_iter().map(|f| f.name() as &str).collect();
// Group by all tag columns
@ -1267,7 +1267,7 @@ impl InfluxRpcPlanner {
let AggExprs {
agg_exprs,
field_columns,
} = AggExprs::try_new_for_read_group(agg, &schema, predicate)?;
} = AggExprs::try_new_for_read_group(agg, schema, predicate)?;
let plan_builder = scan_and_filter
.plan_builder
@ -1347,7 +1347,7 @@ impl InfluxRpcPlanner {
fn read_window_aggregate_plan(
ctx: IOxSessionContext,
table_name: &str,
schema: Arc<Schema>,
schema: &Schema,
predicate: &Predicate,
agg: Aggregate,
every: WindowDuration,
@ -1363,7 +1363,7 @@ impl InfluxRpcPlanner {
.with_chunks(chunks)
.build()?;
let schema = scan_and_filter.schema();
let schema = scan_and_filter.provider.iox_schema();
// Group by all tag columns and the window bounds
let window_bound = make_window_bound_expr(TIME_COLUMN_NAME.as_expr(), every, offset)
@ -1378,7 +1378,7 @@ impl InfluxRpcPlanner {
let AggExprs {
agg_exprs,
field_columns,
} = AggExprs::try_new_for_read_window_aggregate(agg, &schema, predicate)?;
} = AggExprs::try_new_for_read_window_aggregate(agg, schema, predicate)?;
// sort by the group by expressions as well
let sort_exprs = group_exprs
@ -1436,7 +1436,7 @@ fn table_chunk_stream<'a>(
let table_schema = namespace.table_schema(table_name);
let projection = match table_schema {
Some(table_schema) => {
columns_in_predicates(need_fields, table_schema, table_name, predicate)
columns_in_predicates(need_fields, &table_schema, table_name, predicate)
}
None => None,
};
@ -1470,7 +1470,7 @@ fn table_chunk_stream<'a>(
// in the predicate.
fn columns_in_predicates(
need_fields: bool,
table_schema: Arc<Schema>,
table_schema: &Schema,
table_name: &str,
predicate: &Predicate,
) -> Option<Vec<usize>> {
@ -1560,7 +1560,7 @@ where
&'a str,
&'a Predicate,
Vec<Arc<dyn QueryChunk>>,
Arc<Schema>,
&'a Schema,
) -> Result<P>
+ Clone
+ Send
@ -1592,7 +1592,7 @@ where
table_name: table_name.as_ref(),
})?;
f(&ctx, table_name, predicate, chunks, schema)
f(&ctx, table_name, predicate, chunks, &schema)
}
})
.try_collect()
@ -2004,25 +2004,24 @@ mod tests {
// test 1: empty predicate without need_fields
let predicate = Predicate::new();
let need_fields = false;
let projection = columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate);
let projection = columns_in_predicates(need_fields, &schema, table, &predicate);
assert_eq!(projection, None);
// test 2: empty predicate with need_fields
let need_fields = true;
let projection = columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate);
let projection = columns_in_predicates(need_fields, &schema, table, &predicate);
assert_eq!(projection, None);
// test 3: predicate on tag without need_fields
let predicate = Predicate::new().with_expr(col("foo").eq(lit("some_thing")));
let need_fields = false;
let projection =
columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap();
let projection = columns_in_predicates(need_fields, &schema, table, &predicate).unwrap();
// return index of foo
assert_eq!(projection, vec![1]);
// test 4: predicate on tag with need_fields
let need_fields = true;
let projection = columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate);
let projection = columns_in_predicates(need_fields, &schema, table, &predicate);
// return None means all fields
assert_eq!(projection, None);
@ -2032,7 +2031,7 @@ mod tests {
.with_field_columns(vec!["i64_field".to_string()]);
let need_fields = false;
let mut projection =
columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap();
columns_in_predicates(need_fields, &schema, table, &predicate).unwrap();
projection.sort();
// return indexes of i64_field and foo
assert_eq!(projection, vec![1, 2]);
@ -2040,7 +2039,7 @@ mod tests {
// test 6: predicate on tag with field_columns with need_fields
let need_fields = true;
let mut projection =
columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap();
columns_in_predicates(need_fields, &schema, table, &predicate).unwrap();
projection.sort();
// return indexes of foo and index of i64_field
assert_eq!(projection, vec![1, 2]);
@ -2051,7 +2050,7 @@ mod tests {
.with_field_columns(vec!["i64_field".to_string()]);
let need_fields = false;
let mut projection =
columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap();
columns_in_predicates(need_fields, &schema, table, &predicate).unwrap();
projection.sort();
// return indexes of bard and i64_field
assert_eq!(projection, vec![0, 2]);
@ -2059,7 +2058,7 @@ mod tests {
// test 7: predicate on tag and field with field_columns with need_fields
let need_fields = true;
let mut projection =
columns_in_predicates(need_fields, Arc::clone(&schema), table, &predicate).unwrap();
columns_in_predicates(need_fields, &schema, table, &predicate).unwrap();
projection.sort();
// return indexes of bard and i64_field
assert_eq!(projection, vec![0, 2]);

View File

@ -75,7 +75,7 @@ impl ReorgPlanner {
pub fn compact_plan<I>(
&self,
table_name: Arc<str>,
schema: Arc<Schema>,
schema: &Schema,
chunks: I,
output_sort_key: SortKey,
) -> Result<LogicalPlan>
@ -150,7 +150,7 @@ impl ReorgPlanner {
pub fn split_plan<I>(
&self,
table_name: Arc<str>,
schema: Arc<Schema>,
schema: &Schema,
chunks: I,
output_sort_key: SortKey,
split_times: Vec<i64>,
@ -214,7 +214,7 @@ mod test {
use super::*;
async fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<dyn QueryChunk>>) {
async fn get_test_chunks() -> (Schema, Vec<Arc<dyn QueryChunk>>) {
// Chunk 1 with 5 rows of data on 2 tags
let chunk1 = Arc::new(
TestChunk::new("t")
@ -261,16 +261,16 @@ mod test {
assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await);
let schema = SchemaMerger::new()
.merge(&chunk1.schema())
.merge(chunk1.schema())
.unwrap()
.merge(&chunk2.schema())
.merge(chunk2.schema())
.unwrap()
.build();
(schema, vec![chunk1, chunk2])
}
async fn get_sorted_test_chunks() -> (Arc<Schema>, Vec<Arc<dyn QueryChunk>>) {
async fn get_sorted_test_chunks() -> (Schema, Vec<Arc<dyn QueryChunk>>) {
// Chunk 1
let chunk1 = Arc::new(
TestChunk::new("t")
@ -307,7 +307,7 @@ mod test {
];
assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await);
(chunk1.schema(), vec![chunk1, chunk2])
(chunk1.schema().clone(), vec![chunk1, chunk2])
}
#[tokio::test]
@ -333,7 +333,7 @@ mod test {
.build();
let compact_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
.compact_plan(Arc::from("t"), Arc::clone(&schema), chunks, sort_key)
.compact_plan(Arc::from("t"), &schema, chunks, sort_key)
.expect("created compact plan");
let physical_plan = executor
@ -370,7 +370,7 @@ mod test {
.build();
let compact_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
.compact_plan(Arc::from("t"), schema, chunks, sort_key)
.compact_plan(Arc::from("t"), &schema, chunks, sort_key)
.expect("created compact plan");
let executor = Executor::new_testing();
@ -421,7 +421,7 @@ mod test {
// split on 1000 should have timestamps 1000, 5000, and 7000
let split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
.split_plan(Arc::from("t"), schema, chunks, sort_key, vec![1000])
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000])
.expect("created compact plan");
let executor = Executor::new_testing();
@ -485,7 +485,7 @@ mod test {
// split on 1000 and 7000
let split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
.split_plan(Arc::from("t"), schema, chunks, sort_key, vec![1000, 7000])
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000, 7000])
.expect("created compact plan");
let executor = Executor::new_testing();
@ -561,7 +561,7 @@ mod test {
// split on 1000 and 7000
let _split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
.split_plan(Arc::from("t"), schema, chunks, sort_key, vec![]) // reason of panic: empty split_times
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![]) // reason of panic: empty split_times
.expect("created compact plan");
}
@ -580,7 +580,7 @@ mod test {
// split on 1000 and 7000
let _split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
.split_plan(Arc::from("t"), schema, chunks, sort_key, vec![1000, 500]) // reason of panic: split_times not in ascending order
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000, 500]) // reason of panic: split_times not in ascending order
.expect("created compact plan");
}
}

View File

@ -44,7 +44,7 @@ pub trait QueryChunkMeta {
fn summary(&self) -> Arc<TableSummary>;
/// return a reference to the summary of the data held in this chunk
fn schema(&self) -> Arc<Schema>;
fn schema(&self) -> &Schema;
/// Return a reference to the chunk's partition sort key if any.
/// Only persisted chunk has its partition sort key
@ -191,7 +191,7 @@ impl QueryChunkData {
/// Read data into [`RecordBatch`]es. This is mostly meant for testing!
pub async fn read_to_batches(
self,
schema: Arc<Schema>,
schema: &Schema,
session_ctx: &SessionContext,
) -> Vec<RecordBatch> {
match self {
@ -274,7 +274,7 @@ where
self.as_ref().summary()
}
fn schema(&self) -> Arc<Schema> {
fn schema(&self) -> &Schema {
self.as_ref().schema()
}
@ -303,7 +303,7 @@ impl QueryChunkMeta for Arc<dyn QueryChunk> {
self.as_ref().summary()
}
fn schema(&self) -> Arc<Schema> {
fn schema(&self) -> &Schema {
self.as_ref().schema()
}

View File

@ -8,7 +8,6 @@ use influxdb_influxql_parser::select::{Field, SelectStatement};
use influxdb_influxql_parser::statement::Statement;
use predicate::rpc_predicate::QueryNamespaceMeta;
use schema::Schema;
use std::sync::Arc;
/// Returns the first `Field` of the `SELECT` statement.
pub(crate) fn get_first_field(s: &str) -> Field {
@ -86,8 +85,8 @@ impl QueryNamespaceMeta for MockNamespace {
.collect()
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
fn table_schema(&self, table_name: &str) -> Option<Schema> {
let c = self.chunks.iter().find(|x| x.table_name() == table_name)?;
Some(c.schema())
Some(c.schema().clone())
}
}

View File

@ -100,7 +100,7 @@ pub trait ChunkPruner: Sync + Send + std::fmt::Debug {
fn prune_chunks(
&self,
table_name: &str,
table_schema: Arc<Schema>,
table_schema: &Schema,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: &Predicate,
) -> Result<Vec<Arc<dyn QueryChunk>>>;
@ -112,7 +112,7 @@ pub trait ChunkPruner: Sync + Send + std::fmt::Debug {
#[derive(Debug)]
pub struct ProviderBuilder {
table_name: Arc<str>,
schema: Arc<Schema>,
schema: Schema,
chunks: Vec<Arc<dyn QueryChunk>>,
output_sort_key: Option<SortKey>,
deduplication: bool,
@ -122,7 +122,7 @@ pub struct ProviderBuilder {
}
impl ProviderBuilder {
pub fn new(table_name: Arc<str>, schema: Arc<Schema>, ctx: IOxSessionContext) -> Self {
pub fn new(table_name: Arc<str>, schema: Schema, ctx: IOxSessionContext) -> Self {
Self {
table_name,
schema,
@ -173,7 +173,7 @@ impl ProviderBuilder {
pub struct ChunkTableProvider {
table_name: Arc<str>,
/// The IOx schema (wrapper around Arrow Schemaref) for this table
iox_schema: Arc<Schema>,
iox_schema: Schema,
/// The chunks
chunks: Vec<Arc<dyn QueryChunk>>,
/// The desired output sort key if any
@ -187,8 +187,8 @@ pub struct ChunkTableProvider {
impl ChunkTableProvider {
/// Return the IOx schema view for the data provided by this provider
pub fn iox_schema(&self) -> Arc<Schema> {
Arc::clone(&self.iox_schema)
pub fn iox_schema(&self) -> &Schema {
&self.iox_schema
}
/// Return the Arrow schema view for the data provided by this provider
@ -229,10 +229,7 @@ impl TableProvider for ChunkTableProvider {
let chunks: Vec<Arc<dyn QueryChunk>> = self.chunks.to_vec();
// Figure out the schema of the requested output
let scan_schema = match projection {
Some(indices) => Arc::new(self.iox_schema.select_by_indices(indices)),
None => Arc::clone(&self.iox_schema),
};
let schema = projection.map(|indices| self.iox_schema.select_by_indices(indices));
// This debug shows the self.arrow_schema() includes all columns in all chunks
// which means the schema of all chunks are merged before invoking this scan
@ -247,7 +244,7 @@ impl TableProvider for ChunkTableProvider {
let plan = deduplicate.build_scan_plan(
Arc::clone(&self.table_name),
scan_schema,
schema.as_ref().unwrap_or(&self.iox_schema),
chunks,
predicate,
self.output_sort_key.clone(),
@ -502,7 +499,7 @@ impl Deduplicater {
pub(crate) fn build_scan_plan(
mut self,
table_name: Arc<str>,
output_schema: Arc<Schema>,
output_schema: &Schema,
chunks: Vec<Arc<dyn QueryChunk>>,
mut predicate: Predicate,
output_sort_key: Option<SortKey>,
@ -532,7 +529,7 @@ impl Deduplicater {
let mut non_duplicate_plans = Self::build_plans_for_non_duplicates_chunks(
self.ctx.child_ctx("build_plans_for_non_duplicates_chunks"),
Arc::clone(&output_schema),
output_schema,
chunks,
predicate,
output_sort_key.as_ref(),
@ -593,7 +590,7 @@ impl Deduplicater {
plans.push(Self::build_deduplicate_plan_for_overlapped_chunks(
self.ctx
.child_ctx("build_deduplicate_plan_for_overlapped_chunks"),
Arc::clone(&output_schema),
output_schema,
overlapped_chunks,
predicate.clone(),
&chunks_dedup_sort_key,
@ -622,7 +619,7 @@ impl Deduplicater {
plans.push(Self::build_deduplicate_plan_for_chunk_with_duplicates(
self.ctx
.child_ctx("build_deduplicate_plan_for_chunk_with_duplicates"),
Arc::clone(&output_schema),
output_schema,
chunk_with_duplicates,
predicate.clone(),
&chunk_dedup_sort_key,
@ -640,7 +637,7 @@ impl Deduplicater {
);
let mut non_duplicate_plans = Self::build_plans_for_non_duplicates_chunks(
self.ctx.child_ctx("build_plans_for_non_duplicates_chunks"),
Arc::clone(&output_schema),
output_schema,
chunks,
predicate,
output_sort_key.as_ref(),
@ -808,7 +805,7 @@ impl Deduplicater {
///```
fn build_deduplicate_plan_for_overlapped_chunks(
ctx: IOxSessionContext,
output_schema: Arc<Schema>,
output_schema: &Schema,
chunks: Vec<Arc<dyn QueryChunk>>, // These chunks are identified overlapped
predicate: Predicate,
output_sort_key: &SortKey,
@ -829,7 +826,7 @@ impl Deduplicater {
};
let pk_schema = Self::compute_pk_schema(&chunks, schema_interner);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema, schema_interner);
let input_schema = Self::compute_input_schema(output_schema, &pk_schema, schema_interner);
debug!(
?output_schema,
@ -844,7 +841,7 @@ impl Deduplicater {
.map(|chunk| {
Self::build_sort_plan_for_read_filter(
ctx.child_ctx("build_sort_plan_for_read_filter"),
Arc::clone(&input_schema),
&input_schema,
Arc::clone(chunk),
predicate.clone(),
Some(output_sort_key),
@ -900,7 +897,7 @@ impl Deduplicater {
///```
fn build_deduplicate_plan_for_chunk_with_duplicates(
ctx: IOxSessionContext,
output_schema: Arc<Schema>,
output_schema: &Schema,
chunk: Arc<dyn QueryChunk>, // This chunk is identified having duplicates
predicate: Predicate,
output_sort_key: &SortKey,
@ -909,10 +906,10 @@ impl Deduplicater {
// This will practically never matter because this can only happen for in-memory chunks which are currently
// backed by RecordBatches and these don't do anything with the predicate at all. However to prevent weird
// future issues, we still transform the predicate here. (@crepererum, 2022-11-16)
let predicate = predicate.push_through_dedup(&chunk.schema());
let predicate = predicate.push_through_dedup(chunk.schema());
let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)], schema_interner);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema, schema_interner);
let input_schema = Self::compute_input_schema(output_schema, &pk_schema, schema_interner);
debug!(
?output_schema,
@ -927,7 +924,7 @@ impl Deduplicater {
// Create the 2 bottom nodes RecordBatchesExec and SortExec
let plan = Self::build_sort_plan_for_read_filter(
ctx.child_ctx("build_sort_plan_for_read_filter"),
Arc::clone(&input_schema),
&input_schema,
Arc::clone(&chunks[0]),
predicate,
Some(output_sort_key),
@ -963,7 +960,7 @@ impl Deduplicater {
///```
fn add_projection_node_if_needed(
output_schema: Arc<Schema>,
output_schema: &Schema,
input: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let input_schema = input.schema();
@ -1042,7 +1039,7 @@ impl Deduplicater {
///```
fn build_sort_plan_for_read_filter(
ctx: IOxSessionContext,
output_schema: Arc<Schema>,
output_schema: &Schema,
chunk: Arc<dyn QueryChunk>,
predicate: Predicate, // This is the select predicate of the query
output_sort_key: Option<&SortKey>,
@ -1066,7 +1063,7 @@ impl Deduplicater {
trace!("Build sort plan for a single chunk. Sort node won't be added if the plan is already sorted");
let mut schema_merger = SchemaMerger::new()
.with_interner(schema_interner)
.merge(&output_schema)
.merge(output_schema)
.unwrap();
let chunk_schema = chunk.schema();
trace!(?chunk_schema, "chunk schema");
@ -1097,7 +1094,7 @@ impl Deduplicater {
// Create the bottom node RecordBatchesExec for this chunk
let mut input = chunks_to_physical_nodes(
input_schema,
&input_schema,
output_sort_key,
vec![Arc::clone(&chunk)],
predicate,
@ -1206,7 +1203,7 @@ impl Deduplicater {
// And some optional operators on top such as applying delete predicates or sort the chunk
fn build_plan_for_non_duplicates_chunk(
ctx: IOxSessionContext,
output_schema: Arc<Schema>,
output_schema: &Schema,
chunk: Arc<dyn QueryChunk>, // This chunk is identified having no duplicates
predicate: Predicate,
output_sort_key: Option<&SortKey>,
@ -1256,7 +1253,7 @@ impl Deduplicater {
#[allow(clippy::too_many_arguments)]
fn build_plans_for_non_duplicates_chunks(
ctx: IOxSessionContext,
output_schema: Arc<Schema>,
output_schema: &Schema,
chunks: Chunks, // These chunks is identified having no duplicates
predicate: Predicate,
output_sort_key: Option<&SortKey>,
@ -1291,7 +1288,7 @@ impl Deduplicater {
.map(|chunk| {
Self::build_plan_for_non_duplicates_chunk(
ctx.child_ctx("build_plan_for_non_duplicates_chunk"),
Arc::clone(&output_schema),
output_schema,
Arc::clone(chunk),
predicate.clone(),
output_sort_key,
@ -1307,7 +1304,7 @@ impl Deduplicater {
fn compute_pk_schema<'a>(
chunks: impl IntoIterator<Item = &'a Arc<dyn QueryChunk>>,
schema_interner: &mut SchemaInterner,
) -> Arc<Schema> {
) -> Schema {
let mut schema_merger = SchemaMerger::new().with_interner(schema_interner);
for chunk in chunks {
let chunk_schema = chunk.schema();
@ -1330,10 +1327,10 @@ impl Deduplicater {
fn compute_chunks_schema<'a>(
chunks: impl IntoIterator<Item = &'a Arc<dyn QueryChunk>>,
schema_interner: &mut SchemaInterner,
) -> Arc<Schema> {
) -> Schema {
let mut schema_merger = SchemaMerger::new().with_interner(schema_interner);
for chunk in chunks {
schema_merger = schema_merger.merge(&chunk.schema()).unwrap();
schema_merger = schema_merger.merge(chunk.schema()).unwrap();
}
schema_merger.build()
@ -1345,7 +1342,7 @@ impl Deduplicater {
output_schema: &Schema,
pk_schema: &Schema,
schema_interner: &mut SchemaInterner,
) -> Arc<Schema> {
) -> Schema {
SchemaMerger::new()
.with_interner(schema_interner)
.merge(output_schema)
@ -1621,7 +1618,7 @@ mod test {
let sort_plan = Deduplicater::build_sort_plan_for_read_filter(
IOxSessionContext::with_testing(),
Arc::clone(&schema),
schema,
Arc::clone(&chunk),
Predicate::default(),
Some(&sort_key.clone()),
@ -1775,7 +1772,7 @@ mod test {
// All chunks in one single scan
let plans = Deduplicater::build_plans_for_non_duplicates_chunks(
IOxSessionContext::with_testing(),
Arc::clone(&schema),
schema,
Chunks::split_overlapped_chunks(vec![Arc::clone(&chunk1), Arc::clone(&chunk2)], false)
.unwrap(),
Predicate::default(),
@ -1939,7 +1936,7 @@ mod test {
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
// the same for 2 chunks
let schema = chunk1.schema();
let schema = chunk1.schema().clone();
let chunks = vec![chunk1, chunk2];
// data in its original form
@ -1964,7 +1961,7 @@ mod test {
let output_sort_key = SortKey::from_columns(vec!["tag1", "tag2", "time"]);
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxSessionContext::with_testing(),
Arc::clone(&schema),
&schema,
chunks,
Predicate::default(),
&output_sort_key,
@ -2024,7 +2021,7 @@ mod test {
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxSessionContext::with_testing(),
schema,
&schema,
vec![chunk1, chunk2],
Predicate::default(),
&output_sort_key,
@ -2100,7 +2097,7 @@ mod test {
let output_sort_key = SortKey::from_columns(vec!["tag1", "tag2", "time"]);
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxSessionContext::with_testing(),
Arc::new(schema),
&schema,
chunks,
Predicate::default(),
&output_sort_key,
@ -2199,7 +2196,7 @@ mod test {
let output_sort_key = SortKey::from_columns(vec!["tag2", "tag1", "time"]);
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxSessionContext::with_testing(),
Arc::new(schema),
&schema,
chunks,
Predicate::default(),
&output_sort_key,
@ -2267,11 +2264,11 @@ mod test {
// With provided stats, the computed key will be (tag2, tag1, tag3, time)
// Requested output schema == the schema for all three
let schema = SchemaMerger::new()
.merge(chunk1.schema().as_ref())
.merge(chunk1.schema())
.unwrap()
.merge(chunk2.schema().as_ref())
.merge(chunk2.schema())
.unwrap()
.merge(chunk3.schema().as_ref())
.merge(chunk3.schema())
.unwrap()
.build();
@ -2306,7 +2303,7 @@ mod test {
let output_sort_key = SortKey::from_columns(vec!["tag2", "tag1", "time"]);
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxSessionContext::with_testing(),
schema,
&schema,
chunks,
Predicate::default(),
&output_sort_key,
@ -2364,7 +2361,7 @@ mod test {
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk.schema();
let schema = chunk.schema().clone();
let chunks = vec![chunk];
// data in its original form
@ -2385,7 +2382,7 @@ mod test {
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
Arc::clone(&schema),
&schema,
chunks.clone(),
Predicate::default(),
None,
@ -2400,7 +2397,7 @@ mod test {
let deduplicator =
Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false);
let plan = deduplicator
.build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None)
.build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None)
.unwrap();
let batch = test_collect(plan).await;
// The data will stay in their original order
@ -2433,7 +2430,7 @@ mod test {
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk.schema();
let schema = chunk.schema().clone();
let chunks = vec![chunk];
// data in its original form
@ -2459,7 +2456,7 @@ mod test {
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
Arc::clone(&schema),
&schema,
chunks.clone(),
Predicate::default(),
None,
@ -2487,7 +2484,7 @@ mod test {
let deduplicator =
Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false);
let plan = deduplicator
.build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None)
.build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None)
.unwrap();
let batch = test_collect(plan).await;
// Deduplication is disabled, the output shoudl be the same as the original data
@ -2546,13 +2543,12 @@ mod test {
.timestamp()
.build()
.unwrap();
let schema = Arc::new(schema);
let deduplicator = Deduplicater::new(IOxSessionContext::with_testing());
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
Arc::clone(&Arc::clone(&schema)),
&schema,
chunks.clone(),
Predicate::default(),
None,
@ -2581,13 +2577,7 @@ mod test {
let deduplicator =
Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false);
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
Arc::clone(&schema),
chunks,
Predicate::default(),
None,
)
.build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None)
.unwrap();
let batch = test_collect(plan).await;
// Deduplication is disabled, the output should include all rows but only 2 selected columns
@ -2655,7 +2645,7 @@ mod test {
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk1.schema();
let schema = chunk1.schema().clone();
let chunks = vec![chunk1, chunk2];
// data in its original form
@ -2686,7 +2676,7 @@ mod test {
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
Arc::clone(&schema),
&schema,
chunks.clone(),
Predicate::default(),
None,
@ -2715,7 +2705,7 @@ mod test {
let deduplicator =
Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false);
let plan = deduplicator
.build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None)
.build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None)
.unwrap();
let batch = test_collect(plan).await;
// Deduplication is disabled, the output shoudl be the same as the original data
@ -2816,7 +2806,7 @@ mod test {
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk1.schema();
let schema = chunk1.schema().clone();
let chunks = vec![chunk1, chunk2, chunk3, chunk4];
// data in its original form
@ -2855,7 +2845,7 @@ mod test {
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
Arc::clone(&schema),
&schema,
chunks.clone(),
Predicate::default(),
None,
@ -2935,7 +2925,7 @@ mod test {
let deduplicator =
Deduplicater::new(IOxSessionContext::with_testing()).enable_deduplication(false);
let plan = deduplicator
.build_scan_plan(Arc::from("t"), schema, chunks, Predicate::default(), None)
.build_scan_plan(Arc::from("t"), &schema, chunks, Predicate::default(), None)
.unwrap();
// Plan is very simple with one single RecordBatchesExec that includes 4 chunks
@ -3042,7 +3032,7 @@ mod test {
) as Arc<dyn QueryChunk>;
// Datafusion schema of the chunk
let schema = chunk1.schema();
let schema = chunk1.schema().clone();
let chunks = vec![chunk1, chunk2, chunk3, chunk4];
// data in its original form
@ -3081,7 +3071,7 @@ mod test {
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
Arc::clone(&schema),
&schema,
chunks.clone(),
Predicate::default(),
Some(sort_key.clone()), // Ask to sort the plan output
@ -3158,7 +3148,7 @@ mod test {
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
schema,
&schema,
chunks,
Predicate::default(),
Some(sort_key.clone()),
@ -3260,13 +3250,13 @@ mod test {
.with_sort_key(sort_key.clone()), // signal the chunk is sorted
) as Arc<dyn QueryChunk>;
let schema = chunk1.schema();
let schema = chunk1.schema().clone();
let chunks = vec![chunk1, chunk2, chunk3, chunk4];
let deduplicator = Deduplicater::new(IOxSessionContext::with_testing());
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
Arc::clone(&schema),
&schema,
chunks.clone(),
Predicate::default(),
Some(sort_key.clone()),
@ -3325,7 +3315,7 @@ mod test {
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
schema,
&schema,
chunks,
Predicate::default(),
Some(sort_key),
@ -3488,7 +3478,7 @@ mod test {
.with_sort_key(sort_key.clone()), // signal the chunk is sorted
) as Arc<dyn QueryChunk>;
let schema = chunk1_1.schema();
let schema = chunk1_1.schema().clone();
let chunks = vec![
chunk1_1, chunk1_2, chunk1_3, chunk1_4, chunk2_1, chunk2_2, chunk2_3, chunk2_4,
chunk2_5, chunk2_6,
@ -3497,7 +3487,7 @@ mod test {
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
Arc::clone(&schema),
&schema,
chunks.clone(),
Predicate::default(),
Some(sort_key.clone()),
@ -3557,7 +3547,7 @@ mod test {
let plan = deduplicator
.build_scan_plan(
Arc::from("t"),
schema,
&schema,
chunks,
Predicate::default(),
Some(sort_key),

View File

@ -119,7 +119,7 @@ fn combine_sort_key(
/// pushdown ([`RecordBatchesExec`] has NO builtin filter function). Delete predicates are NOT applied at all. The
/// caller is responsible for wrapping the output node into appropriate filter nodes.
pub fn chunks_to_physical_nodes(
iox_schema: Arc<Schema>,
iox_schema: &Schema,
output_sort_key: Option<&SortKey>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: Predicate,

View File

@ -72,8 +72,8 @@ pub trait PruningObserver {
/// filtering those where the predicate can be proven to evaluate to
/// `false` for every single row.
pub fn prune_chunks(
table_schema: Arc<Schema>,
chunks: &Vec<Arc<dyn QueryChunk>>,
table_schema: &Schema,
chunks: &[Arc<dyn QueryChunk>],
predicate: &Predicate,
) -> Result<Vec<bool>, NotPrunedReason> {
let num_chunks = chunks.len();
@ -85,8 +85,8 @@ pub fn prune_chunks(
/// Given a `Vec` of pruning summaries, return a `Vec<bool>` where `false` indicates that the
/// predicate can be proven to evaluate to `false` for every single row.
pub fn prune_summaries(
table_schema: Arc<Schema>,
summaries: &Vec<Arc<TableSummary>>,
table_schema: &Schema,
summaries: &[Arc<TableSummary>],
predicate: &Predicate,
) -> Result<Vec<bool>, NotPrunedReason> {
let filter_expr = match predicate.filter_expr() {
@ -108,7 +108,7 @@ pub fn prune_summaries(
};
let statistics = ChunkPruningStatistics {
table_schema: table_schema.as_ref(),
table_schema,
summaries,
};
@ -126,7 +126,7 @@ pub fn prune_summaries(
/// interface required by [`PruningPredicate`]
struct ChunkPruningStatistics<'a> {
table_schema: &'a Schema,
summaries: &'a Vec<Arc<TableSummary>>,
summaries: &'a [Arc<TableSummary>],
}
impl<'a> ChunkPruningStatistics<'a> {
@ -263,7 +263,7 @@ mod test {
let c1 = Arc::new(TestChunk::new("chunk1"));
let predicate = Predicate::new();
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result, Err(NotPrunedReason::NoExpressionOnPredicate));
}
@ -281,7 +281,7 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100.0f64)));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![false]);
}
@ -299,7 +299,7 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100i64)));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![false]);
}
@ -318,7 +318,7 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100u64)));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![false]);
}
@ -335,7 +335,7 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1"));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![false; 1]);
}
@ -355,7 +355,7 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1").gt(lit("z")));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![false]);
}
@ -372,7 +372,7 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1").lt(lit(100.0f64)));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![true]);
}
@ -390,7 +390,7 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1").lt(lit(100i64)));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![true]);
}
@ -408,7 +408,7 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1").lt(lit(100u64)));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![true]);
}
@ -426,7 +426,7 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1"));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![true]);
}
@ -446,14 +446,14 @@ mod test {
let predicate = Predicate::new().with_expr(col("column1").lt(lit("z")));
let result = prune_chunks(c1.schema(), &vec![c1], &predicate);
let result = prune_chunks(&c1.schema().clone(), &[c1], &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![true]);
}
fn merge_schema(chunks: &[Arc<dyn QueryChunk>]) -> Arc<Schema> {
fn merge_schema(chunks: &[Arc<dyn QueryChunk>]) -> Schema {
let mut merger = SchemaMerger::new();
for chunk in chunks {
merger = merger.merge(chunk.schema().as_ref()).unwrap();
merger = merger.merge(chunk.schema()).unwrap();
}
merger.build()
}
@ -491,7 +491,7 @@ mod test {
let chunks = vec![c1, c2, c3, c4];
let schema = merge_schema(&chunks);
let result = prune_chunks(schema, &chunks, &predicate);
let result = prune_chunks(&schema, &chunks, &predicate);
assert_eq!(
result.expect("pruning succeeds"),
@ -549,7 +549,7 @@ mod test {
let chunks = vec![c1, c2, c3, c4, c5, c6];
let schema = merge_schema(&chunks);
let result = prune_chunks(schema, &chunks, &predicate);
let result = prune_chunks(&schema, &chunks, &predicate);
assert_eq!(
result.expect("pruning succeeds"),
@ -587,7 +587,7 @@ mod test {
let chunks = vec![c1, c2, c3];
let schema = merge_schema(&chunks);
let result = prune_chunks(schema, &chunks, &predicate);
let result = prune_chunks(&schema, &chunks, &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![false, true, true]);
}
@ -645,7 +645,7 @@ mod test {
let chunks = vec![c1, c2, c3];
let schema = merge_schema(&chunks);
let result = prune_chunks(schema, &chunks, &predicate);
let result = prune_chunks(&schema, &chunks, &predicate);
assert_eq!(result.expect("pruning succeeds"), vec![true, false, false]);
}
@ -706,7 +706,7 @@ mod test {
let chunks = vec![c1, c2, c3, c4, c5, c6];
let schema = merge_schema(&chunks);
let result = prune_chunks(schema, &chunks, &predicate);
let result = prune_chunks(&schema, &chunks, &predicate);
assert_eq!(
result.expect("Pruning succeeds"),

View File

@ -142,7 +142,7 @@ impl QueryNamespace for TestDatabase {
}
impl QueryNamespaceMeta for TestDatabase {
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
fn table_schema(&self, table_name: &str) -> Option<Schema> {
let mut merger = SchemaMerger::new();
let mut found_one = false;
@ -150,7 +150,7 @@ impl QueryNamespaceMeta for TestDatabase {
for partition in partitions.values() {
for chunk in partition.values() {
if chunk.table_name() == table_name {
merger = merger.merge(&chunk.schema()).expect("consistent schemas");
merger = merger.merge(chunk.schema()).expect("consistent schemas");
found_one = true;
}
}
@ -188,7 +188,7 @@ pub struct TestChunk {
table_name: String,
/// Schema of the table
schema: Arc<Schema>,
schema: Schema,
/// Return value for summary()
table_summary: TableSummary,
@ -294,7 +294,7 @@ impl TestChunk {
let table_name = table_name.into();
Self {
table_name,
schema: Arc::new(SchemaBuilder::new().build().unwrap()),
schema: SchemaBuilder::new().build().unwrap(),
table_summary: TableSummary::default(),
id: ChunkId::new_test(0),
may_contain_pk_duplicates: Default::default(),
@ -552,9 +552,7 @@ impl TestChunk {
) -> Self {
let mut merger = SchemaMerger::new();
merger = merger.merge(&new_column_schema).unwrap();
merger = merger
.merge(self.schema.as_ref())
.expect("merging was successful");
merger = merger.merge(&self.schema).expect("merging was successful");
self.schema = merger.build();
for i in 0..new_column_schema.len() {
@ -627,7 +625,7 @@ impl TestChunk {
.collect::<Vec<_>>();
let batch =
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch");
if !self.quiet {
println!("TestChunk batch data: {:#?}", batch);
}
@ -667,7 +665,7 @@ impl TestChunk {
.collect::<Vec<_>>();
let batch =
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch");
if !self.quiet {
println!("TestChunk batch data: {:#?}", batch);
}
@ -731,7 +729,7 @@ impl TestChunk {
.collect::<Vec<_>>();
let batch =
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch");
self.table_data.push(Arc::new(batch));
self
@ -794,7 +792,7 @@ impl TestChunk {
.collect::<Vec<_>>();
let batch =
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch");
self.table_data.push(Arc::new(batch));
self
@ -864,7 +862,7 @@ impl TestChunk {
.collect::<Vec<_>>();
let batch =
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch");
self.table_data.push(Arc::new(batch));
self
@ -941,7 +939,7 @@ impl TestChunk {
.collect::<Vec<_>>();
let batch =
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
RecordBatch::try_new(self.schema.as_arrow(), columns).expect("made record batch");
self.table_data.push(Arc::new(batch));
self
@ -1069,8 +1067,8 @@ impl QueryChunkMeta for TestChunk {
Arc::new(self.table_summary.clone())
}
fn schema(&self) -> Arc<Schema> {
Arc::clone(&self.schema)
fn schema(&self) -> &Schema {
&self.schema
}
fn partition_sort_key(&self) -> Option<&SortKey> {

View File

@ -424,11 +424,7 @@ impl TestTable {
.collect();
let schema = table_schema.select_by_names(&selection).unwrap();
let chunk = ParquetChunk::new(
Arc::new(file),
Arc::new(schema),
self.catalog.parquet_store.clone(),
);
let chunk = ParquetChunk::new(Arc::new(file), schema, self.catalog.parquet_store.clone());
chunk
.parquet_exec_input()
.read_to_batches(
@ -624,7 +620,7 @@ impl TestPartition {
);
let row_count = record_batch.num_rows();
assert!(row_count > 0, "Parquet file must have at least 1 row");
let (record_batch, sort_key) = sort_batch(record_batch, schema.clone());
let (record_batch, sort_key) = sort_batch(record_batch, &schema);
let record_batch = dedup_batch(record_batch, &sort_key);
let object_store_id = object_store_id.unwrap_or_else(Uuid::new_v4);
@ -974,7 +970,7 @@ impl TestParquetFile {
}
/// Get Parquet file schema.
pub async fn schema(&self) -> Arc<Schema> {
pub async fn schema(&self) -> Schema {
let table_schema = self.table.catalog_schema().await;
let column_id_lookup = table_schema.column_id_map();
let selection: Vec<_> = self
@ -984,7 +980,7 @@ impl TestParquetFile {
.map(|id| *column_id_lookup.get(id).unwrap())
.collect();
let table_schema: Schema = table_schema.clone().try_into().unwrap();
Arc::new(table_schema.select_by_names(&selection).unwrap())
table_schema.select_by_names(&selection).unwrap()
}
}
@ -1018,9 +1014,9 @@ pub fn now() -> Time {
}
/// Sort arrow record batch into arrow record batch and sort key.
fn sort_batch(record_batch: RecordBatch, schema: Schema) -> (RecordBatch, SortKey) {
fn sort_batch(record_batch: RecordBatch, schema: &Schema) -> (RecordBatch, SortKey) {
// calculate realistic sort key
let sort_key = compute_sort_key(&schema, std::iter::once(&record_batch));
let sort_key = compute_sort_key(schema, std::iter::once(&record_batch));
// set up sorting
let mut sort_columns = Vec::with_capacity(record_batch.num_columns());

View File

@ -18,7 +18,7 @@ pub struct ParquetChunk {
parquet_file: Arc<ParquetFile>,
/// Schema that goes with this table's parquet file
schema: Arc<Schema>,
schema: Schema,
/// Persists the parquet file within a namespace's relative path
store: ParquetStorage,
@ -26,7 +26,7 @@ pub struct ParquetChunk {
impl ParquetChunk {
/// Create parquet chunk.
pub fn new(parquet_file: Arc<ParquetFile>, schema: Arc<Schema>, store: ParquetStorage) -> Self {
pub fn new(parquet_file: Arc<ParquetFile>, schema: Schema, store: ParquetStorage) -> Self {
Self {
parquet_file,
schema,
@ -55,9 +55,9 @@ impl ParquetChunk {
mem::size_of_val(self) + self.parquet_file.size() - mem::size_of_val(&self.parquet_file)
}
/// Infallably return the full schema (for all columns) for this chunk
pub fn schema(&self) -> Arc<Schema> {
Arc::clone(&self.schema)
/// Infallibly return the full schema (for all columns) for this chunk
pub fn schema(&self) -> &Schema {
&self.schema
}
/// Return the columns names that belong to the given column selection

View File

@ -760,7 +760,7 @@ impl DecodedIoxParquetMetaData {
}
/// Read IOx schema from parquet metadata.
pub fn read_schema(&self) -> Result<Arc<Schema>> {
pub fn read_schema(&self) -> Result<Schema> {
let file_metadata = self.md.file_metadata();
let arrow_schema = parquet_to_arrow_schema(
@ -776,10 +776,9 @@ impl DecodedIoxParquetMetaData {
// as this metadata will vary from file to file
let arrow_schema_ref = Arc::new(arrow_schema.with_metadata(Default::default()));
let schema: Schema = arrow_schema_ref
arrow_schema_ref
.try_into()
.context(IoxFromArrowFailureSnafu {})?;
Ok(Arc::new(schema))
.context(IoxFromArrowFailureSnafu {})
}
/// Read IOx statistics (including timestamp range) from parquet metadata.

View File

@ -169,7 +169,9 @@ pub trait QueryNamespaceMeta {
fn table_names(&self) -> Vec<String>;
/// Schema for a specific table if the table exists.
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>>;
///
/// TODO: Make this return Option<&Schema>
fn table_schema(&self, table_name: &str) -> Option<Schema>;
}
/// Predicate that has been "specialized" / normalized for a
@ -204,13 +206,13 @@ pub trait QueryNamespaceMeta {
/// ```
fn normalize_predicate(
table_name: &str,
schema: Arc<Schema>,
schema: Schema,
predicate: &Predicate,
) -> DataFusionResult<Predicate> {
let mut predicate = predicate.clone();
let mut field_projections = FieldProjectionRewriter::new(Arc::clone(&schema));
let mut missing_tag_columns = MissingTagColumnRewriter::new(Arc::clone(&schema));
let mut field_projections = FieldProjectionRewriter::new(schema.clone());
let mut missing_tag_columns = MissingTagColumnRewriter::new(schema.clone());
let mut field_value_exprs = vec![];
@ -221,7 +223,7 @@ fn normalize_predicate(
.exprs
.into_iter()
.map(|e| {
let simplifier = ExprSimplifier::new(SimplifyAdapter::new(schema.as_ref()));
let simplifier = ExprSimplifier::new(SimplifyAdapter::new(&schema));
debug!(?e, "rewriting expr");
@ -349,10 +351,9 @@ mod tests {
#[test]
fn test_normalize_predicate_coerced() {
let schema = schema();
let predicate = normalize_predicate(
"table",
Arc::clone(&schema),
schema(),
&Predicate::new().with_expr(col("t1").eq(lit("f1"))),
)
.unwrap();
@ -470,8 +471,8 @@ mod tests {
assert_eq!(predicate, expected);
}
fn schema() -> Arc<Schema> {
let schema = schema::builder::SchemaBuilder::new()
fn schema() -> Schema {
schema::builder::SchemaBuilder::new()
.tag("t1")
.tag("t2")
.field("f1", DataType::Int64)
@ -479,9 +480,7 @@ mod tests {
.field("f2", DataType::Int64)
.unwrap()
.build()
.unwrap();
Arc::new(schema)
.unwrap()
}
#[allow(dead_code)]

View File

@ -1,5 +1,3 @@
use std::sync::Arc;
use datafusion::{
error::Result as DataFusionResult, logical_expr::expr_rewriter::ExprRewriter, prelude::*,
scalar::ScalarValue,
@ -11,12 +9,12 @@ use schema::{InfluxColumnType, Schema};
#[derive(Debug)]
pub(crate) struct MissingTagColumnRewriter {
/// The input schema
schema: Arc<Schema>,
schema: Schema,
}
impl MissingTagColumnRewriter {
/// Create a new [`MissingTagColumnRewriter`] targeting the given schema
pub(crate) fn new(schema: Arc<Schema>) -> Self {
pub(crate) fn new(schema: Schema) -> Self {
Self { schema }
}
@ -105,7 +103,7 @@ mod tests {
.build()
.unwrap();
let mut rewriter = MissingTagColumnRewriter::new(Arc::new(schema));
let mut rewriter = MissingTagColumnRewriter::new(schema);
expr.rewrite(&mut rewriter).unwrap()
}
}

View File

@ -45,12 +45,12 @@ pub(crate) struct FieldProjectionRewriter {
/// output
field_predicates: Vec<Expr>,
/// The input schema (from where we know the field)
schema: Arc<Schema>,
schema: Schema,
}
impl FieldProjectionRewriter {
/// Create a new [`FieldProjectionRewriter`] targeting the given schema
pub(crate) fn new(schema: Arc<Schema>) -> Self {
pub(crate) fn new(schema: Schema) -> Self {
Self {
field_predicates: vec![],
schema,
@ -429,7 +429,7 @@ mod tests {
"Running test\ninput: {:?}\nexpected_expr: {:?}\nexpected_field_columns: {:?}\n",
input, exp_expr, exp_field_columns
);
let mut rewriter = FieldProjectionRewriter::new(Arc::clone(&schema));
let mut rewriter = FieldProjectionRewriter::new(schema.clone());
let rewritten = rewriter.rewrite_field_exprs(input).unwrap();
assert_eq!(rewritten, exp_expr);
@ -471,9 +471,8 @@ mod tests {
input, exp_error
);
let schema = Arc::clone(&schema);
let run_case = move || {
let mut rewriter = FieldProjectionRewriter::new(schema);
let run_case = || {
let mut rewriter = FieldProjectionRewriter::new(schema.clone());
// check for error in rewrite_field_exprs
rewriter.rewrite_field_exprs(input)?;
// check for error adding to predicate
@ -500,7 +499,7 @@ mod tests {
query_functions::regex_not_match_expr(arg, pattern.into())
}
fn make_schema() -> Arc<Schema> {
fn make_schema() -> Schema {
SchemaBuilder::new()
.tag("foo")
.tag("bar")
@ -513,7 +512,6 @@ mod tests {
.field("f4", DataType::Float64)
.unwrap()
.build()
.map(Arc::new)
.unwrap()
}
}

View File

@ -206,7 +206,7 @@ impl NamespaceCache {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CachedTable {
pub id: TableId,
pub schema: Arc<Schema>,
pub schema: Schema,
pub column_id_map: HashMap<ColumnId, Arc<str>>,
pub column_id_map_rev: HashMap<Arc<str>, ColumnId>,
pub primary_key_column_ids: Vec<ColumnId>,
@ -242,7 +242,7 @@ impl From<TableSchema> for CachedTable {
column_id_map.shrink_to_fit();
let id = table.id;
let schema: Arc<Schema> = Arc::new(table.try_into().expect("Catalog table schema broken"));
let schema: Schema = table.try_into().expect("Catalog table schema broken");
let mut column_id_map_rev: HashMap<Arc<str>, ColumnId> = column_id_map
.iter()
@ -368,15 +368,13 @@ mod tests {
Arc::from("table1"),
Arc::new(CachedTable {
id: table11.table.id,
schema: Arc::new(
SchemaBuilder::new()
.field("col1", DataType::Int64)
.unwrap()
.tag("col2")
.timestamp()
.build()
.unwrap(),
),
schema: SchemaBuilder::new()
.field("col1", DataType::Int64)
.unwrap()
.tag("col2")
.timestamp()
.build()
.unwrap(),
column_id_map: HashMap::from([
(col111.column.id, Arc::from(col111.column.name.clone())),
(col112.column.id, Arc::from(col112.column.name.clone())),
@ -394,14 +392,12 @@ mod tests {
Arc::from("table2"),
Arc::new(CachedTable {
id: table12.table.id,
schema: Arc::new(
SchemaBuilder::new()
.field("col1", DataType::Float64)
.unwrap()
.timestamp()
.build()
.unwrap(),
),
schema: SchemaBuilder::new()
.field("col1", DataType::Float64)
.unwrap()
.timestamp()
.build()
.unwrap(),
column_id_map: HashMap::from([
(col121.column.id, Arc::from(col121.column.name.clone())),
(col122.column.id, Arc::from(col122.column.name.clone())),
@ -433,7 +429,7 @@ mod tests {
Arc::from("table1"),
Arc::new(CachedTable {
id: table21.table.id,
schema: Arc::new(SchemaBuilder::new().timestamp().build().unwrap()),
schema: SchemaBuilder::new().timestamp().build().unwrap(),
column_id_map: HashMap::from([(
col211.column.id,
Arc::from(col211.column.name.clone()),

View File

@ -532,7 +532,7 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
}
fn schema() -> Arc<Schema> {
Arc::new(SchemaBuilder::new().build().unwrap())
fn schema() -> Schema {
SchemaBuilder::new().build().unwrap()
}
}

View File

@ -58,7 +58,7 @@ impl CacheKey {
type CacheT = Box<
dyn Cache<
K = CacheKey,
V = Arc<Schema>,
V = Schema,
GetExtra = (Arc<CachedTable>, Option<Span>),
PeekExtra = ((), Option<Span>),
>,
@ -97,12 +97,10 @@ impl ProjectedSchemaCache {
// order by name since IDs are rather arbitrary
projection.sort();
Arc::new(
table
.schema
.select_by_names(&projection)
.expect("Bug in schema projection"),
)
table
.schema
.select_by_names(&projection)
.expect("Bug in schema projection")
});
let loader = Arc::new(MetricsLoader::new(
loader,
@ -117,7 +115,7 @@ impl ProjectedSchemaCache {
backend.add_policy(LruPolicy::new(
Arc::clone(&ram_pool),
CACHE_ID,
Arc::new(FunctionEstimator::new(|k: &CacheKey, v: &Arc<Schema>| {
Arc::new(FunctionEstimator::new(|k: &CacheKey, v: &Schema| {
RamSize(k.size() + size_of_val(v) + v.estimate_size())
})),
));
@ -148,7 +146,7 @@ impl ProjectedSchemaCache {
table: Arc<CachedTable>,
projection: Vec<ColumnId>,
span: Option<Span>,
) -> Arc<Schema> {
) -> Schema {
let key = CacheKey::new(table.id, projection);
self.cache.get(key, (table, span)).await
@ -176,25 +174,21 @@ mod tests {
let table_id_1 = TableId::new(1);
let table_id_2 = TableId::new(2);
let table_schema_a = Arc::new(
SchemaBuilder::new()
.tag("t1")
.tag("t2")
.tag("t3")
.timestamp()
.build()
.unwrap(),
);
let table_schema_b = Arc::new(
SchemaBuilder::new()
.tag("t1")
.tag("t2")
.tag("t3")
.tag("t4")
.timestamp()
.build()
.unwrap(),
);
let table_schema_a = SchemaBuilder::new()
.tag("t1")
.tag("t2")
.tag("t3")
.timestamp()
.build()
.unwrap();
let table_schema_b = SchemaBuilder::new()
.tag("t1")
.tag("t2")
.tag("t3")
.tag("t4")
.timestamp()
.build()
.unwrap();
let column_id_map_a = HashMap::from([
(ColumnId::new(1), Arc::from("t1")),
(ColumnId::new(2), Arc::from("t2")),
@ -210,7 +204,7 @@ mod tests {
]);
let table_1a = Arc::new(CachedTable {
id: table_id_1,
schema: Arc::clone(&table_schema_a),
schema: table_schema_a.clone(),
column_id_map: column_id_map_a.clone(),
column_id_map_rev: reverse_map(&column_id_map_a),
primary_key_column_ids: vec![
@ -222,7 +216,7 @@ mod tests {
});
let table_1b = Arc::new(CachedTable {
id: table_id_1,
schema: Arc::clone(&table_schema_b),
schema: table_schema_b.clone(),
column_id_map: column_id_map_b.clone(),
column_id_map_rev: reverse_map(&column_id_map_b),
primary_key_column_ids: vec![
@ -234,7 +228,7 @@ mod tests {
});
let table_2a = Arc::new(CachedTable {
id: table_id_2,
schema: Arc::clone(&table_schema_a),
schema: table_schema_a.clone(),
column_id_map: column_id_map_a.clone(),
column_id_map_rev: reverse_map(&column_id_map_a),
primary_key_column_ids: vec![
@ -247,7 +241,7 @@ mod tests {
});
// initial request
let expected = Arc::new(SchemaBuilder::new().tag("t1").tag("t2").build().unwrap());
let expected = SchemaBuilder::new().tag("t1").tag("t2").build().unwrap();
let projection_1 = cache
.get(
Arc::clone(&table_1a),
@ -265,7 +259,7 @@ mod tests {
None,
)
.await;
assert!(Arc::ptr_eq(&projection_1, &projection_2));
assert!(Arc::ptr_eq(projection_1.inner(), projection_2.inner()));
// updated table schema
let projection_3 = cache
@ -275,7 +269,7 @@ mod tests {
None,
)
.await;
assert!(Arc::ptr_eq(&projection_1, &projection_3));
assert!(Arc::ptr_eq(projection_1.inner(), projection_3.inner()));
// different column order
let projection_4 = cache
@ -285,10 +279,10 @@ mod tests {
None,
)
.await;
assert!(Arc::ptr_eq(&projection_1, &projection_4));
assert!(Arc::ptr_eq(projection_1.inner(), projection_4.inner()));
// different columns set
let expected = Arc::new(SchemaBuilder::new().tag("t1").tag("t3").build().unwrap());
let expected = SchemaBuilder::new().tag("t1").tag("t3").build().unwrap();
let projection_5 = cache
.get(
Arc::clone(&table_1a),
@ -307,7 +301,7 @@ mod tests {
)
.await;
assert_eq!(projection_6, projection_1);
assert!(!Arc::ptr_eq(&projection_1, &projection_6));
assert!(!Arc::ptr_eq(projection_1.inner(), projection_6.inner()));
// original data still present
let projection_7 = cache
@ -317,7 +311,7 @@ mod tests {
None,
)
.await;
assert!(Arc::ptr_eq(&projection_1, &projection_7));
assert!(Arc::ptr_eq(projection_1.inner(), projection_7.inner()));
}
fn reverse_map<K, V>(map: &HashMap<K, V>) -> HashMap<V, K>

View File

@ -617,7 +617,7 @@ impl IngesterStreamDecoder {
.take()
.expect("Partition should have been checked before chunk creation");
self.current_partition =
Some(current_partition.try_add_chunk(ChunkId::new(), Arc::new(schema), batches)?);
Some(current_partition.try_add_chunk(ChunkId::new(), schema, batches)?);
}
Ok(())
@ -1043,7 +1043,7 @@ impl IngesterPartition {
pub(crate) fn try_add_chunk(
mut self,
chunk_id: ChunkId,
expected_schema: Arc<Schema>,
expected_schema: Schema,
batches: Vec<RecordBatch>,
) -> Result<Self> {
// ignore chunk if there are no batches
@ -1059,7 +1059,7 @@ impl IngesterPartition {
// details)
let batches = batches
.into_iter()
.map(|batch| ensure_schema(batch, expected_schema.as_ref()))
.map(|batch| ensure_schema(batch, &expected_schema))
.collect::<Result<Vec<RecordBatch>>>()?;
// TODO: may want to ask the Ingester to send this value instead of computing it here.
@ -1141,7 +1141,7 @@ impl IngesterPartition {
pub struct IngesterChunk {
chunk_id: ChunkId,
partition_id: PartitionId,
schema: Arc<Schema>,
schema: Schema,
/// Partition-wide sort key.
partition_sort_key: Option<Arc<SortKey>>,
@ -1198,9 +1198,9 @@ impl QueryChunkMeta for IngesterChunk {
Arc::clone(&self.summary)
}
fn schema(&self) -> Arc<Schema> {
fn schema(&self) -> &Schema {
trace!(schema=?self.schema, "IngesterChunk schema");
Arc::clone(&self.schema)
&self.schema
}
fn partition_sort_key(&self) -> Option<&SortKey> {
@ -2011,16 +2011,14 @@ mod tests {
.await
}
fn schema() -> Arc<Schema> {
Arc::new(
SchemaBuilder::new()
.influx_field("bar", InfluxFieldType::Float)
.influx_field("baz", InfluxFieldType::Float)
.influx_field("foo", InfluxFieldType::Float)
.timestamp()
.build()
.unwrap(),
)
fn schema() -> Schema {
SchemaBuilder::new()
.influx_field("bar", InfluxFieldType::Float)
.influx_field("baz", InfluxFieldType::Float)
.influx_field("foo", InfluxFieldType::Float)
.timestamp()
.build()
.unwrap()
}
fn lp_to_record_batch(lp: &str) -> RecordBatch {
@ -2165,7 +2163,7 @@ mod tests {
#[test]
fn test_ingester_partition_type_cast() {
let expected_schema = Arc::new(SchemaBuilder::new().tag("t").timestamp().build().unwrap());
let expected_schema = SchemaBuilder::new().tag("t").timestamp().build().unwrap();
let cases = vec![
// send a batch that matches the schema exactly
@ -2188,7 +2186,7 @@ mod tests {
tombstone_max_sequence_number,
None,
)
.try_add_chunk(ChunkId::new(), Arc::clone(&expected_schema), vec![case])
.try_add_chunk(ChunkId::new(), expected_schema.clone(), vec![case])
.unwrap();
for batch in &ingester_partition.chunks[0].batches {
@ -2199,15 +2197,12 @@ mod tests {
#[test]
fn test_ingester_partition_fail_type_cast() {
let expected_schema = Arc::new(
SchemaBuilder::new()
.field("b", DataType::Boolean)
.unwrap()
.timestamp()
.build()
.unwrap(),
);
let expected_schema = SchemaBuilder::new()
.field("b", DataType::Boolean)
.unwrap()
.timestamp()
.build()
.unwrap();
let batch =
RecordBatch::try_from_iter(vec![("b", int64_array()), ("time", ts_array())]).unwrap();
@ -2223,7 +2218,7 @@ mod tests {
tombstone_max_sequence_number,
None,
)
.try_add_chunk(ChunkId::new(), Arc::clone(&expected_schema), vec![batch])
.try_add_chunk(ChunkId::new(), expected_schema, vec![batch])
.unwrap_err();
assert_matches!(err, Error::RecordBatchType { .. });

View File

@ -91,18 +91,17 @@ impl IngesterConnection for MockIngesterConnection {
let total_row_count =
batches.iter().map(|b| b.num_rows()).sum::<usize>() as u64;
let summary =
create_basic_summary(total_row_count, &new_schema, ic.ts_min_max);
super::IngesterChunk {
chunk_id: ic.chunk_id,
partition_id: ic.partition_id,
schema: Arc::new(new_schema.clone()),
schema: new_schema,
partition_sort_key: ic.partition_sort_key,
batches,
ts_min_max: ic.ts_min_max,
summary: Arc::new(create_basic_summary(
total_row_count,
&new_schema,
ic.ts_min_max,
)),
summary: Arc::new(summary),
}
})
.collect::<Vec<_>>();

View File

@ -72,7 +72,7 @@ impl QuerierNamespace {
namespace_retention_period: ns.retention_period,
table_id: cached_table.id,
table_name: Arc::clone(table_name),
schema: Arc::clone(&cached_table.schema),
schema: cached_table.schema.clone(),
ingester_connection: ingester_connection.clone(),
chunk_adapter: Arc::clone(&chunk_adapter),
exec: Arc::clone(&exec),
@ -184,7 +184,7 @@ mod tests {
let qns = querier_namespace(&ns).await;
let expected_schema = SchemaBuilder::new().build().unwrap();
let actual_schema = schema(&qns, "table");
assert_eq!(actual_schema.as_ref(), &expected_schema,);
assert_eq!(actual_schema, &expected_schema,);
table.create_column("col1", ColumnType::I64).await;
table.create_column("col2", ColumnType::Bool).await;
@ -197,7 +197,7 @@ mod tests {
.build()
.unwrap();
let actual_schema = schema(&qns, "table");
assert_eq!(actual_schema.as_ref(), &expected_schema,);
assert_eq!(actual_schema, &expected_schema);
table.create_column("col4", ColumnType::Tag).await;
table
@ -213,7 +213,7 @@ mod tests {
.build()
.unwrap();
let actual_schema = schema(&qns, "table");
assert_eq!(actual_schema.as_ref(), &expected_schema,);
assert_eq!(actual_schema, &expected_schema);
}
fn sorted<T>(mut v: Vec<T>) -> Vec<T>
@ -234,7 +234,7 @@ mod tests {
)
}
fn schema(querier_namespace: &QuerierNamespace, table: &str) -> Arc<Schema> {
Arc::clone(querier_namespace.tables.get(table).unwrap().schema())
fn schema<'a>(querier_namespace: &'a QuerierNamespace, table: &str) -> &'a Schema {
querier_namespace.tables.get(table).unwrap().schema()
}
}

View File

@ -31,8 +31,8 @@ impl QueryNamespaceMeta for QuerierNamespace {
names
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.tables.get(table_name).map(|t| Arc::clone(t.schema()))
fn table_schema(&self, table_name: &str) -> Option<Schema> {
self.tables.get(table_name).map(|t| t.schema().clone())
}
}

View File

@ -84,11 +84,7 @@ impl ChunkAdapter {
.collect();
// Prune on the most basic summary data (timestamps and column names) before trying to fully load the chunks
let keeps = match prune_summaries(
Arc::clone(&cached_table.schema),
&basic_summaries,
predicate,
) {
let keeps = match prune_summaries(&cached_table.schema, &basic_summaries, predicate) {
Ok(keeps) => keeps,
Err(reason) => {
// Ignore pruning failures here - the chunk pruner should have already logged them.

View File

@ -6,7 +6,7 @@ use data_types::{
};
use iox_query::util::create_basic_summary;
use parquet_file::chunk::ParquetChunk;
use schema::{sort::SortKey, Schema};
use schema::sort::SortKey;
use std::sync::Arc;
mod creation;
@ -84,9 +84,6 @@ pub struct QuerierParquetChunk {
/// Immutable chunk metadata
meta: Arc<QuerierParquetChunkMeta>,
/// Schema of the chunk
schema: Arc<Schema>,
/// Delete predicates to be combined with the chunk
delete_predicates: Vec<Arc<DeletePredicate>>,
@ -107,11 +104,9 @@ impl QuerierParquetChunk {
meta: Arc<QuerierParquetChunkMeta>,
partition_sort_key: Option<Arc<SortKey>>,
) -> Self {
let schema = parquet_chunk.schema();
let table_summary = Arc::new(create_basic_summary(
parquet_chunk.rows() as u64,
&parquet_chunk.schema(),
parquet_chunk.schema(),
parquet_chunk.timestamp_min_max(),
));
@ -119,7 +114,6 @@ impl QuerierParquetChunk {
meta,
delete_predicates: Vec::new(),
partition_sort_key,
schema,
parquet_chunk,
table_summary,
}
@ -341,7 +335,7 @@ pub mod tests {
.build()
.unwrap();
let actual_schema = chunk.schema();
assert_eq!(actual_schema.as_ref(), &expected_schema);
assert_eq!(actual_schema, &expected_schema);
}
fn assert_sort_key(chunk: &QuerierParquetChunk) {

View File

@ -14,8 +14,8 @@ impl QueryChunkMeta for QuerierParquetChunk {
Arc::clone(&self.table_summary)
}
fn schema(&self) -> Arc<Schema> {
Arc::clone(&self.schema)
fn schema(&self) -> &Schema {
self.parquet_chunk.schema()
}
fn partition_sort_key(&self) -> Option<&SortKey> {

View File

@ -76,7 +76,7 @@ pub struct QuerierTableArgs {
pub namespace_retention_period: Option<Duration>,
pub table_id: TableId,
pub table_name: Arc<str>,
pub schema: Arc<Schema>,
pub schema: Schema,
pub ingester_connection: Option<Arc<dyn IngesterConnection>>,
pub chunk_adapter: Arc<ChunkAdapter>,
pub exec: Arc<Executor>,
@ -106,7 +106,7 @@ pub struct QuerierTable {
table_id: TableId,
/// Table schema.
schema: Arc<Schema>,
schema: Schema,
/// Connection to ingester
ingester_connection: Option<Arc<dyn IngesterConnection>>,
@ -165,7 +165,7 @@ impl QuerierTable {
}
/// Schema.
pub fn schema(&self) -> &Arc<Schema> {
pub fn schema(&self) -> &Schema {
&self.schema
}
@ -372,7 +372,7 @@ impl QuerierTable {
.prune_chunks(
self.table_name(),
// use up-to-date schema
Arc::clone(&cached_table.schema),
&cached_table.schema,
chunks,
&predicate,
)
@ -848,7 +848,7 @@ mod tests {
let schema = make_schema_two_fields_two_tags(&table).await;
// let add a partion from the ingester
let builder = IngesterPartitionBuilder::new(&schema, &shard, &partition)
let builder = IngesterPartitionBuilder::new(schema, &shard, &partition)
.with_lp(["table,tag1=val1,tag2=val2 foo=3,bar=4 11"]);
let ingester_partition =
@ -945,7 +945,7 @@ mod tests {
.with_compaction_level(CompactionLevel::FileNonOverlapped);
partition.create_parquet_file(builder).await;
let builder = IngesterPartitionBuilder::new(&schema, &shard, &partition);
let builder = IngesterPartitionBuilder::new(schema, &shard, &partition);
let ingester_partition =
builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(1)));
@ -1015,18 +1015,16 @@ mod tests {
.create_tombstone(12, 1, 100, "foo=3")
.await;
let schema = Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
);
let schema = SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap();
let ingester_chunk_id1 = u128::MAX - 1;
let builder1 = IngesterPartitionBuilder::new(&schema, &shard, &partition1);
let builder2 = IngesterPartitionBuilder::new(&schema, &shard, &partition2);
let builder1 = IngesterPartitionBuilder::new(schema.clone(), &shard, &partition1);
let builder2 = IngesterPartitionBuilder::new(schema, &shard, &partition2);
let querier_table = TestQuerierTable::new(&catalog, &table)
.await
.with_ingester_partition(
@ -1111,16 +1109,14 @@ mod tests {
let partition1 = table.with_shard(&shard).create_partition("k1").await;
let partition2 = table.with_shard(&shard).create_partition("k2").await;
let schema = Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap(),
);
let schema = SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Integer)
.timestamp()
.build()
.unwrap();
let builder1 = IngesterPartitionBuilder::new(&schema, &shard, &partition1);
let builder2 = IngesterPartitionBuilder::new(&schema, &shard, &partition2);
let builder1 = IngesterPartitionBuilder::new(schema.clone(), &shard, &partition1);
let builder2 = IngesterPartitionBuilder::new(schema, &shard, &partition2);
let querier_table = TestQuerierTable::new(&catalog, &table)
.await
@ -1172,7 +1168,7 @@ mod tests {
let schema = make_schema(&table).await;
let builder =
IngesterPartitionBuilder::new(&schema, &shard, &partition).with_lp(["table foo=1 1"]);
IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]);
// Parquet file between with max sequence number 2
let pf_builder = TestParquetFileBuilder::default()
@ -1230,7 +1226,7 @@ mod tests {
let querier_table = TestQuerierTable::new(&catalog, &table).await;
let builder =
IngesterPartitionBuilder::new(&schema, &shard, &partition).with_lp(["table foo=1 1"]);
IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]);
// parquet file with max sequence number 1
let pf_builder = TestParquetFileBuilder::default()
@ -1292,7 +1288,7 @@ mod tests {
let querier_table = TestQuerierTable::new(&catalog, &table).await;
let builder =
IngesterPartitionBuilder::new(&schema, &shard, &partition).with_lp(["table foo=1 1"]);
IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]);
// parquet file with max sequence number 1
let pf_builder = TestParquetFileBuilder::default()
@ -1319,20 +1315,18 @@ mod tests {
}
/// Adds a "foo" column to the table and returns the created schema
async fn make_schema(table: &Arc<TestTable>) -> Arc<Schema> {
async fn make_schema(table: &Arc<TestTable>) -> Schema {
table.create_column("foo", ColumnType::F64).await;
table.create_column("time", ColumnType::Time).await;
// create corresponding schema
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Float)
.timestamp()
.build()
.unwrap(),
)
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Float)
.timestamp()
.build()
.unwrap()
}
async fn make_schema_two_fields_two_tags(table: &Arc<TestTable>) -> Arc<Schema> {
async fn make_schema_two_fields_two_tags(table: &Arc<TestTable>) -> Schema {
table.create_column("time", ColumnType::Time).await;
table.create_column("foo", ColumnType::F64).await;
table.create_column("bar", ColumnType::F64).await;
@ -1340,16 +1334,14 @@ mod tests {
table.create_column("tag2", ColumnType::Tag).await;
// create corresponding schema
Arc::new(
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Float)
.influx_field("bar", InfluxFieldType::Float)
.tag("tag1")
.tag("tag2")
.timestamp()
.build()
.unwrap(),
)
SchemaBuilder::new()
.influx_field("foo", InfluxFieldType::Float)
.influx_field("bar", InfluxFieldType::Float)
.tag("tag1")
.tag("tag2")
.timestamp()
.build()
.unwrap()
}
/// A `QuerierTable` and some number of `IngesterPartitions` that

View File

@ -54,7 +54,7 @@ impl TableProvider for QuerierTable {
let mut builder = ProviderBuilder::new(
Arc::clone(self.table_name()),
Arc::clone(self.schema()),
self.schema().clone(),
iox_ctx,
);
@ -108,7 +108,7 @@ impl ChunkPruner for QuerierTableChunkPruner {
fn prune_chunks(
&self,
_table_name: &str,
table_schema: Arc<Schema>,
table_schema: &Schema,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: &Predicate,
) -> Result<Vec<Arc<dyn QueryChunk>>, ProviderError> {

View File

@ -29,7 +29,7 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
.await
.unwrap();
let schema = catalog_schema.tables.remove(&table.table.name).unwrap();
let schema = Arc::new(Schema::try_from(schema).unwrap());
let schema = Schema::try_from(schema).unwrap();
let namespace_name = Arc::from(table.namespace.namespace.name.as_str());
@ -63,7 +63,7 @@ pub(crate) fn lp_to_record_batch(lp: &str) -> RecordBatch {
/// Helper for creating IngesterPartitions
#[derive(Debug, Clone)]
pub(crate) struct IngesterPartitionBuilder {
schema: Arc<Schema>,
schema: Schema,
shard: Arc<TestShard>,
partition: Arc<TestPartition>,
ingester_name: Arc<str>,
@ -77,12 +77,12 @@ pub(crate) struct IngesterPartitionBuilder {
impl IngesterPartitionBuilder {
pub(crate) fn new(
schema: &Arc<Schema>,
schema: Schema,
shard: &Arc<TestShard>,
partition: &Arc<TestPartition>,
) -> Self {
Self {
schema: Arc::clone(schema),
schema,
shard: Arc::clone(shard),
partition: Arc::clone(partition),
ingester_name: Arc::from("ingester1"),
@ -135,7 +135,7 @@ impl IngesterPartitionBuilder {
)
.try_add_chunk(
ChunkId::new_test(self.ingester_chunk_id),
Arc::clone(&self.schema),
self.schema.clone(),
data,
)
.unwrap()

View File

@ -1,4 +1,4 @@
use std::{collections::HashSet, sync::Arc};
use std::collections::HashSet;
use crate::Schema;
@ -10,7 +10,7 @@ use crate::Schema;
/// [Interning]: https://en.wikipedia.org/wiki/Interning_(computer_science)
#[derive(Debug, Default)]
pub struct SchemaInterner {
schemas: HashSet<Arc<Schema>>,
schemas: HashSet<Schema>,
}
impl SchemaInterner {
@ -20,12 +20,11 @@ impl SchemaInterner {
}
/// Intern schema.
pub fn intern(&mut self, schema: Schema) -> Arc<Schema> {
pub fn intern(&mut self, schema: Schema) -> Schema {
if let Some(schema) = self.schemas.get(&schema) {
Arc::clone(schema)
schema.clone()
} else {
let schema = Arc::new(schema);
self.schemas.insert(Arc::clone(&schema));
self.schemas.insert(schema.clone());
schema
}
}
@ -34,6 +33,7 @@ impl SchemaInterner {
#[cfg(test)]
mod tests {
use crate::builder::SchemaBuilder;
use std::sync::Arc;
use super::*;
@ -46,12 +46,12 @@ mod tests {
let schema_2 = SchemaBuilder::new().tag("t1").tag("t3").build().unwrap();
let interned_1a = interner.intern(schema_1a.clone());
assert_eq!(interned_1a.as_ref(), &schema_1a);
assert_eq!(interned_1a, schema_1a);
let interned_1b = interner.intern(schema_1b);
assert!(Arc::ptr_eq(&interned_1a, &interned_1b));
assert!(Arc::ptr_eq(interned_1a.inner(), interned_1b.inner()));
let interned_2 = interner.intern(schema_2.clone());
assert_eq!(interned_2.as_ref(), &schema_2);
assert_eq!(interned_2, schema_2);
}
}

View File

@ -44,7 +44,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// This is infallable because the schemas of chunks within a
/// partition are assumed to be compatible because that schema was
/// enforced as part of writing into the partition
pub fn merge_record_batch_schemas(batches: &[Arc<RecordBatch>]) -> Arc<Schema> {
pub fn merge_record_batch_schemas(batches: &[Arc<RecordBatch>]) -> Schema {
let mut merger = SchemaMerger::new();
for batch in batches {
let schema = Schema::try_from(batch.schema()).expect("Schema conversion error");
@ -154,7 +154,7 @@ impl<'a> SchemaMerger<'a> {
}
/// Returns the schema that was built, the columns are always sorted in lexicographic order
pub fn build(mut self) -> Arc<Schema> {
pub fn build(mut self) -> Schema {
let schema = Schema::new_from_parts(
self.measurement.take(),
self.fields.drain().map(|x| x.1),
@ -165,7 +165,7 @@ impl<'a> SchemaMerger<'a> {
if let Some(interner) = self.interner.as_mut() {
interner.intern(schema)
} else {
Arc::new(schema)
schema
}
}
}
@ -198,8 +198,8 @@ mod tests {
.unwrap()
.build();
assert_eq!(merged_schema.as_ref(), &schema1);
assert_eq!(merged_schema.as_ref(), &schema2);
assert_eq!(merged_schema, schema1);
assert_eq!(merged_schema, schema2);
}
#[test]
@ -239,11 +239,9 @@ mod tests {
.sort_fields_by_name();
assert_eq!(
&expected_schema,
merged_schema.as_ref(),
expected_schema, merged_schema,
"\nExpected:\n{:#?}\nActual:\n{:#?}",
expected_schema,
merged_schema
expected_schema, merged_schema
);
}
@ -269,11 +267,9 @@ mod tests {
.unwrap();
assert_eq!(
&expected_schema,
merged_schema.as_ref(),
expected_schema, merged_schema,
"\nExpected:\n{:#?}\nActual:\n{:#?}",
expected_schema,
merged_schema
expected_schema, merged_schema
);
}
@ -303,11 +299,9 @@ mod tests {
.unwrap();
assert_eq!(
&expected_schema,
merged_schema.as_ref(),
expected_schema, merged_schema,
"\nExpected:\n{:#?}\nActual:\n{:#?}",
expected_schema,
merged_schema
expected_schema, merged_schema
);
}
@ -335,11 +329,9 @@ mod tests {
.unwrap();
assert_eq!(
&expected_schema,
merged_schema.as_ref(),
expected_schema, merged_schema,
"\nExpected:\n{:#?}\nActual:\n{:#?}",
expected_schema,
merged_schema
expected_schema, merged_schema
);
}
@ -428,6 +420,9 @@ mod tests {
.build();
assert_eq!(merged_schema_a, merged_schema_b);
assert!(Arc::ptr_eq(&merged_schema_a, &merged_schema_b));
assert!(Arc::ptr_eq(
merged_schema_a.inner(),
merged_schema_b.inner()
));
}
}

View File

@ -894,7 +894,7 @@ mod tests {
use generated_types::node::Type as RPCNodeType;
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate};
use schema::{Schema, SchemaBuilder};
use std::{collections::BTreeSet, sync::Arc};
use std::collections::BTreeSet;
use test_helpers::assert_contains;
use super::*;
@ -915,7 +915,7 @@ mod tests {
self.table_names.clone()
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
fn table_schema(&self, table_name: &str) -> Option<Schema> {
match table_name {
"foo" => {
let schema = SchemaBuilder::new()
@ -929,7 +929,7 @@ mod tests {
.build()
.unwrap();
Some(Arc::new(schema))
Some(schema)
}
"bar" => {
let schema = SchemaBuilder::new()
@ -939,7 +939,7 @@ mod tests {
.build()
.unwrap();
Some(Arc::new(schema))
Some(schema)
}
_ => None,
}