From a227366432142ab74a7868650c8ced3b5e088c6e Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 24 Oct 2022 16:36:08 +0000 Subject: [PATCH] refactor: do not project chunks in `TestDatabase::chunks` (#5960) Databases are NOT required to project chunks (in practice this is only done by the querier for ingester-based chunks). Instead `iox_query` should (and already does) add the right stream adapters to project chunks or to create NULL-columns. Removing the special handling from the test setup makes it easier to understand and also less likely that `iox_query` starts to rely on this behavior. Helps with #5897. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- iox_query/src/frontend/influxrpc.rs | 26 ++++++++------- iox_query/src/lib.rs | 8 +++-- iox_query/src/test.rs | 51 +++++++---------------------- 3 files changed, 32 insertions(+), 53 deletions(-) diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index a373abb779..3692f42423 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -2074,7 +2074,7 @@ mod tests { executor.join().await; //////////////////////////// - // Test 2: no need_fields --> only PK + columns in predicate are return + // Test 2: no need_fields let need_fields = false; let test_db = Arc::new(TestDatabase::new(Arc::clone(&executor))); @@ -2090,14 +2090,15 @@ mod tests { assert_eq!(result[0].0, "h2o"); // table name assert_eq!(result[0].2.len(), 1); // returned chunks - // chunk schema includes only 3 columns of the table PK + cols in predicate + // chunk schema includes still includes everything (the test table implementation does NOT project chunks) let chunk = &result[0].2[0]; let chunk_schema = (*chunk.schema()).clone(); - assert_eq!(chunk_schema.len(), 3); + assert_eq!(chunk_schema.len(), 5); let chunk_schema = chunk_schema.sort_fields_by_name(); assert_eq!(chunk_schema.field(0).1.name(), "bar"); assert_eq!(chunk_schema.field(1).1.name(), "foo"); - assert_eq!(chunk_schema.field(2).1.name(), TIME_COLUMN_NAME); + assert_eq!(chunk_schema.field(2).1.name(), "i64_field"); + assert_eq!(chunk_schema.field(3).1.name(), "i64_field_2"); executor.join().await; } @@ -2205,7 +2206,6 @@ mod tests { assert_eq!(result[0].2.len(), 1); // returned chunks // Since no data, we do not do pushdown in the test chunk. - // the no-data returned chunk will include all columns of the table let chunk = &result[0].2[0]; let chunk_schema = (*chunk.schema()).clone(); assert_eq!(chunk_schema.len(), 5); @@ -2255,15 +2255,16 @@ mod tests { assert_eq!(result[0].0, "h2o"); // table name assert_eq!(result[0].2.len(), 1); // returned chunks - // chunk schema includes 4 columns: 3 cols of PK plus i64_field_2 + // chunk schema includes everything (test table does NOT perform any projection) let chunk = &result[0].2[0]; let chunk_schema = (*chunk.schema()).clone(); - assert_eq!(chunk_schema.len(), 4); + assert_eq!(chunk_schema.len(), 5); let chunk_schema = chunk_schema.sort_fields_by_name(); assert_eq!(chunk_schema.field(0).1.name(), "bar"); assert_eq!(chunk_schema.field(1).1.name(), "foo"); - assert_eq!(chunk_schema.field(2).1.name(), "i64_field_2"); - assert_eq!(chunk_schema.field(3).1.name(), TIME_COLUMN_NAME); + assert_eq!(chunk_schema.field(2).1.name(), "i64_field"); + assert_eq!(chunk_schema.field(3).1.name(), "i64_field_2"); + assert_eq!(chunk_schema.field(4).1.name(), TIME_COLUMN_NAME); executor.join().await; ///////////// @@ -2287,15 +2288,16 @@ mod tests { assert_eq!(result[0].0, "h2o"); // table name assert_eq!(result[0].2.len(), 1); // returned chunks - // chunk schema includes 4 columns: 3 cols of PK plus i64_field_1 + // chunk schema includes everything (test table does NOT perform any projection) let chunk = &result[0].2[0]; let chunk_schema = (*chunk.schema()).clone(); - assert_eq!(chunk_schema.len(), 4); + assert_eq!(chunk_schema.len(), 5); let chunk_schema = chunk_schema.sort_fields_by_name(); assert_eq!(chunk_schema.field(0).1.name(), "bar"); assert_eq!(chunk_schema.field(1).1.name(), "foo"); assert_eq!(chunk_schema.field(2).1.name(), "i64_field"); - assert_eq!(chunk_schema.field(3).1.name(), TIME_COLUMN_NAME); + assert_eq!(chunk_schema.field(3).1.name(), "i64_field_2"); + assert_eq!(chunk_schema.field(4).1.name(), TIME_COLUMN_NAME); executor.join().await; } diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index 7863e9750f..9672cbac37 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -149,10 +149,14 @@ pub type QueryText = Box; #[async_trait] pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync { /// Returns a set of chunks within the partition with data that may match - /// the provided predicate. If possible, chunks which have no rows that can + /// the provided predicate. + /// + /// If possible, chunks which have no rows that can /// possibly match the predicate may be omitted. + /// /// If projection is None, returned chunks will include all columns of its original data. Otherwise, - /// returned chunks will includs PK columns (tags and time) and columns specified in the projection. + /// returned chunks will include PK columns (tags and time) and columns specified in the projection. Projecting + /// chunks here is optional and a mere optimization. The query subsystem does NOT rely on it. async fn chunks( &self, table_name: &str, diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index e7a0503f1c..9a6d59919f 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -108,54 +108,27 @@ impl QueryDatabase for TestDatabase { &self, table_name: &str, predicate: &Predicate, - projection: &Option>, + _projection: &Option>, _ctx: IOxSessionContext, ) -> Result>, DataFusionError> { // save last predicate *self.chunks_predicate.lock() = predicate.clone(); let partitions = self.partitions.lock().clone(); - let chunks = partitions + Ok(partitions .values() .flat_map(|x| x.values()) - .filter(|x| x.table_name == table_name) + // filter by table + .filter(|c| c.table_name == table_name) + // only keep chunks if their statistics overlap + .filter(|c| { + !matches!( + predicate.apply_to_table_summary(&c.table_summary, c.schema.as_arrow()), + PredicateMatch::Zero + ) + }) .map(|x| Arc::clone(x) as Arc) - .collect::>(); - - // Return chunks with fewer columns if a projection is specified - let mut new_chunks = Vec::with_capacity(chunks.len()); - for c in chunks { - let schema = c.schema(); - let cols = schema.select_given_and_pk_columns(projection); - let cols = cols.iter().map(|c| c.as_str()).collect::>(); - let selection = Selection::Some(&cols); - - let read_result = - c.read_filter(IOxSessionContext::with_testing(), predicate, selection); - if read_result.is_err() { - return Err(read_result.err().unwrap()); - } - let mut stream = read_result.unwrap(); - - let mut new_chunk = TestChunk::new(c.table_name()); - while let Some(b) = stream.next().await { - let b = b.expect("Error in stream"); - new_chunk.table_data.push(Arc::new(b)); - } - - let new_chunk = if !new_chunk.table_data.is_empty() { - let new_schema = Schema::try_from(new_chunk.table_data[0].schema()).unwrap(); - let new_chunk = new_chunk.add_schema_to_table(new_schema, true, None); - Arc::new(new_chunk) as _ - } else { - // No data, return the original empty chunk with the original schema - c - }; - - new_chunks.push(new_chunk); - } - - Ok(new_chunks) + .collect::>()) } fn record_query(