fix: order chunks correctly during query processing

The query processing was implicitly relying on the order provided by the
catalog. This had two issues:

- this ordering was not defined in the API contract (neither via docs
  nor via typing)
- the order was based on chunk IDs which is not adequate in some cases
  (e.g. when chunks are created while a persistence operations is in
  progress)

Now we explicitly sort chunks by `(order, ID)`.

Fixes #1963.
pull/24376/head
Marco Neumann 2021-09-08 09:37:25 +02:00
parent 8a531be05b
commit 1b788732da
8 changed files with 120 additions and 0 deletions

View File

@ -150,6 +150,9 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
/// Returns chunk type which is either MUB, RUB, OS
fn chunk_type(&self) -> &str;
/// Order of this chunk relative to other overlapping chunks.
fn order(&self) -> u32;
}
/// Implement ChunkMeta for something wrapped in an Arc (like Chunks often are)

View File

@ -535,6 +535,14 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
// Note that we may need to sort/deduplicate based on tag
// columns which do not appear in the output
// We need to sort chunks before creating the execution plan. For that, the chunk order is used. Since the order
// only sorts overlapping chunks, we also use the chunk ID for deterministic outputs.
let chunks = {
let mut chunks = chunks;
chunks.sort_unstable_by_key(|c| (c.order(), c.id()));
chunks
};
let pk_schema = Self::compute_pk_schema(&chunks);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema);

View File

@ -171,6 +171,9 @@ pub struct TestChunk {
/// Return value for apply_predicate, if desired
predicate_match: Option<PredicateMatch>,
/// Order of this chunk relative to other overlapping chunks.
order: u32,
}
/// Implements a method for adding a column with default stats
@ -244,6 +247,7 @@ impl TestChunk {
table_data: Default::default(),
saved_error: Default::default(),
predicate_match: Default::default(),
order: Default::default(),
}
}
@ -887,6 +891,10 @@ impl QueryChunk for TestChunk {
Ok(Some(column_names))
}
fn order(&self) -> u32 {
self.order
}
}
impl QueryChunkMeta for TestChunk {

View File

@ -0,0 +1,7 @@
-- Test Setup: ChunkOrder
-- SQL: SELECT * from cpu order by time;
+--------+--------------------------------+------+
| region | time | user |
+--------+--------------------------------+------+
| west | 1970-01-01T00:00:00.000000100Z | 2 |
+--------+--------------------------------+------+

View File

@ -0,0 +1,5 @@
-- Test that deduplication respects chunk ordering
-- IOX_SETUP: ChunkOrder
-- query data
SELECT * from cpu order by time;

View File

@ -18,6 +18,20 @@ async fn test_cases_all_chunks_dropped_sql() {
.expect("flush worked");
}
#[tokio::test]
// Tests from "chunk_order.sql",
async fn test_cases_chunk_order_sql() {
let input_path = Path::new("cases").join("in").join("chunk_order.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "duplicates.sql",
async fn test_cases_duplicates_sql() {

View File

@ -52,6 +52,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(TwoMeasurementsManyFieldsOneChunk),
register_setup!(OneMeasurementThreeChunksWithDuplicates),
register_setup!(OneMeasurementAllChunksDropped),
register_setup!(ChunkOrder),
]
.into_iter()
.map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>))
@ -1110,3 +1111,70 @@ where
.collect()
}
}
/// No data
#[derive(Debug)]
pub struct ChunkOrder {}
#[async_trait]
impl DbSetup for ChunkOrder {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let table_name = "cpu";
let db = make_db().await.db;
// create chunk 0: data->MUB->RUB
write_lp(&db, "cpu,region=west user=1 100").await;
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 0);
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await
.unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
// create chunk 1: data->MUB
write_lp(&db, "cpu,region=west user=2 100").await;
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
// prevent chunk 1 from being part of the persistence
// NOTE: In "real life" that could happen when writes happen while a persistence is in progress, but it's easier
// to trigger w/ this tiny locking trick.
let lockable_chunk = db.lockable_chunk(table_name, partition_key, 1).unwrap();
lockable_chunk
.chunk
.write()
.set_dropping(&Default::default())
.unwrap();
// transform chunk 0 into chunk 2 by persisting
db.persist_partition(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 1);
// unlock chunk again
lockable_chunk
.chunk
.write()
.clear_lifecycle_action()
.unwrap();
let scenario = DbScenario {
scenario_name: "chunks where chunk ID alone cannot be used for ordering".into(),
db,
};
vec![scenario]
}
}

View File

@ -85,6 +85,7 @@ pub struct DbChunk {
meta: Arc<ChunkMetadata>,
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
order: u32,
}
#[derive(Debug)]
@ -171,6 +172,7 @@ impl DbChunk {
meta,
time_of_first_write: chunk.time_of_first_write(),
time_of_last_write: chunk.time_of_last_write(),
order: chunk.order(),
})
}
@ -200,6 +202,7 @@ impl DbChunk {
access_recorder: chunk.access_recorder().clone(),
time_of_first_write: chunk.time_of_first_write(),
time_of_last_write: chunk.time_of_last_write(),
order: chunk.order(),
})
}
@ -487,6 +490,10 @@ impl QueryChunk for DbChunk {
State::ParquetFile { .. } => "OS",
}
}
fn order(&self) -> u32 {
self.order
}
}
impl QueryChunkMeta for DbChunk {