refactor: do not project chunks in `TestDatabase::chunks` (#5960)

Databases are NOT required to project chunks (in practice this is only
done by the querier for ingester-based chunks). Instead `iox_query`
should (and already does) add the right stream adapters to project
chunks or to create NULL-columns. Removing the special handling from the
test setup makes it easier to understand and also less likely that
`iox_query` starts to rely on this behavior.

Helps with #5897.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-10-24 16:36:08 +00:00 committed by GitHub
parent c9b1066b89
commit a227366432
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 53 deletions

View File

@ -2074,7 +2074,7 @@ mod tests {
executor.join().await;
////////////////////////////
// Test 2: no need_fields --> only PK + columns in predicate are return
// Test 2: no need_fields
let need_fields = false;
let test_db = Arc::new(TestDatabase::new(Arc::clone(&executor)));
@ -2090,14 +2090,15 @@ mod tests {
assert_eq!(result[0].0, "h2o"); // table name
assert_eq!(result[0].2.len(), 1); // returned chunks
// chunk schema includes only 3 columns of the table PK + cols in predicate
// chunk schema includes still includes everything (the test table implementation does NOT project chunks)
let chunk = &result[0].2[0];
let chunk_schema = (*chunk.schema()).clone();
assert_eq!(chunk_schema.len(), 3);
assert_eq!(chunk_schema.len(), 5);
let chunk_schema = chunk_schema.sort_fields_by_name();
assert_eq!(chunk_schema.field(0).1.name(), "bar");
assert_eq!(chunk_schema.field(1).1.name(), "foo");
assert_eq!(chunk_schema.field(2).1.name(), TIME_COLUMN_NAME);
assert_eq!(chunk_schema.field(2).1.name(), "i64_field");
assert_eq!(chunk_schema.field(3).1.name(), "i64_field_2");
executor.join().await;
}
@ -2205,7 +2206,6 @@ mod tests {
assert_eq!(result[0].2.len(), 1); // returned chunks
// Since no data, we do not do pushdown in the test chunk.
// the no-data returned chunk will include all columns of the table
let chunk = &result[0].2[0];
let chunk_schema = (*chunk.schema()).clone();
assert_eq!(chunk_schema.len(), 5);
@ -2255,15 +2255,16 @@ mod tests {
assert_eq!(result[0].0, "h2o"); // table name
assert_eq!(result[0].2.len(), 1); // returned chunks
// chunk schema includes 4 columns: 3 cols of PK plus i64_field_2
// chunk schema includes everything (test table does NOT perform any projection)
let chunk = &result[0].2[0];
let chunk_schema = (*chunk.schema()).clone();
assert_eq!(chunk_schema.len(), 4);
assert_eq!(chunk_schema.len(), 5);
let chunk_schema = chunk_schema.sort_fields_by_name();
assert_eq!(chunk_schema.field(0).1.name(), "bar");
assert_eq!(chunk_schema.field(1).1.name(), "foo");
assert_eq!(chunk_schema.field(2).1.name(), "i64_field_2");
assert_eq!(chunk_schema.field(3).1.name(), TIME_COLUMN_NAME);
assert_eq!(chunk_schema.field(2).1.name(), "i64_field");
assert_eq!(chunk_schema.field(3).1.name(), "i64_field_2");
assert_eq!(chunk_schema.field(4).1.name(), TIME_COLUMN_NAME);
executor.join().await;
/////////////
@ -2287,15 +2288,16 @@ mod tests {
assert_eq!(result[0].0, "h2o"); // table name
assert_eq!(result[0].2.len(), 1); // returned chunks
// chunk schema includes 4 columns: 3 cols of PK plus i64_field_1
// chunk schema includes everything (test table does NOT perform any projection)
let chunk = &result[0].2[0];
let chunk_schema = (*chunk.schema()).clone();
assert_eq!(chunk_schema.len(), 4);
assert_eq!(chunk_schema.len(), 5);
let chunk_schema = chunk_schema.sort_fields_by_name();
assert_eq!(chunk_schema.field(0).1.name(), "bar");
assert_eq!(chunk_schema.field(1).1.name(), "foo");
assert_eq!(chunk_schema.field(2).1.name(), "i64_field");
assert_eq!(chunk_schema.field(3).1.name(), TIME_COLUMN_NAME);
assert_eq!(chunk_schema.field(3).1.name(), "i64_field_2");
assert_eq!(chunk_schema.field(4).1.name(), TIME_COLUMN_NAME);
executor.join().await;
}

View File

@ -149,10 +149,14 @@ pub type QueryText = Box<dyn std::fmt::Display + Send + Sync>;
#[async_trait]
pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
/// Returns a set of chunks within the partition with data that may match
/// the provided predicate. If possible, chunks which have no rows that can
/// the provided predicate.
///
/// If possible, chunks which have no rows that can
/// possibly match the predicate may be omitted.
///
/// If projection is None, returned chunks will include all columns of its original data. Otherwise,
/// returned chunks will includs PK columns (tags and time) and columns specified in the projection.
/// returned chunks will include PK columns (tags and time) and columns specified in the projection. Projecting
/// chunks here is optional and a mere optimization. The query subsystem does NOT rely on it.
async fn chunks(
&self,
table_name: &str,

View File

@ -108,54 +108,27 @@ impl QueryDatabase for TestDatabase {
&self,
table_name: &str,
predicate: &Predicate,
projection: &Option<Vec<usize>>,
_projection: &Option<Vec<usize>>,
_ctx: IOxSessionContext,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
// save last predicate
*self.chunks_predicate.lock() = predicate.clone();
let partitions = self.partitions.lock().clone();
let chunks = partitions
Ok(partitions
.values()
.flat_map(|x| x.values())
.filter(|x| x.table_name == table_name)
// filter by table
.filter(|c| c.table_name == table_name)
// only keep chunks if their statistics overlap
.filter(|c| {
!matches!(
predicate.apply_to_table_summary(&c.table_summary, c.schema.as_arrow()),
PredicateMatch::Zero
)
})
.map(|x| Arc::clone(x) as Arc<dyn QueryChunk>)
.collect::<Vec<_>>();
// Return chunks with fewer columns if a projection is specified
let mut new_chunks = Vec::with_capacity(chunks.len());
for c in chunks {
let schema = c.schema();
let cols = schema.select_given_and_pk_columns(projection);
let cols = cols.iter().map(|c| c.as_str()).collect::<Vec<_>>();
let selection = Selection::Some(&cols);
let read_result =
c.read_filter(IOxSessionContext::with_testing(), predicate, selection);
if read_result.is_err() {
return Err(read_result.err().unwrap());
}
let mut stream = read_result.unwrap();
let mut new_chunk = TestChunk::new(c.table_name());
while let Some(b) = stream.next().await {
let b = b.expect("Error in stream");
new_chunk.table_data.push(Arc::new(b));
}
let new_chunk = if !new_chunk.table_data.is_empty() {
let new_schema = Schema::try_from(new_chunk.table_data[0].schema()).unwrap();
let new_chunk = new_chunk.add_schema_to_table(new_schema, true, None);
Arc::new(new_chunk) as _
} else {
// No data, return the original empty chunk with the original schema
c
};
new_chunks.push(new_chunk);
}
Ok(new_chunks)
.collect::<Vec<_>>())
}
fn record_query(