refactor: Extract an ingester verification function

pull/24376/head
Carol (Nichols || Goulding) 2022-05-20 11:54:43 -04:00
parent 2aa76622c3
commit 549dd497ea
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 53 additions and 66 deletions

View File

@ -352,6 +352,7 @@ impl<T> Drop for IngestHandlerImpl<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::data::SnapshotBatch;
use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
@ -636,6 +637,44 @@ mod tests {
(ingester, sequencer, namespace)
}
async fn verify_ingester_buffer_has_data(
ingester: IngestHandlerImpl,
sequencer: Sequencer,
namespace: Namespace,
custom_batch_verification: impl Fn(&SnapshotBatch) + Send,
) {
// give the writes some time to go through the buffer. Exit once we've verified there's
// data in there
tokio::time::timeout(Duration::from_secs(1), async move {
loop {
let mut has_measurement = false;
if let Some(data) = ingester.data.sequencer(sequencer.id) {
if let Some(data) = data.namespace(&namespace.name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot("cpu", "1970-01-01").await {
if let Some(b) = b.first() {
custom_batch_verification(b);
if b.data.num_rows() == 1 {
has_measurement = true;
}
}
}
}
}
if has_measurement {
break;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
})
.await
.expect("timeout");
}
#[tokio::test]
async fn seeks_on_initialization() {
let ingest_ts1 = Time::from_timestamp_millis(42);
@ -656,41 +695,15 @@ mod tests {
let (ingester, sequencer, namespace) =
ingester_test_setup(write_operations, 2, false).await;
// give the writes some time to go through the buffer. Exit once we've verified there's
// data in there
tokio::time::timeout(Duration::from_secs(1), async {
loop {
let mut has_measurement = false;
if let Some(data) = ingester.data.sequencer(sequencer.id) {
if let Some(data) = data.namespace(&namespace.name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot("cpu", "1970-01-01").await {
if let Some(b) = b.first() {
if b.min_sequencer_number == SequenceNumber::new(1) {
panic!(
"initialization did a seek to the beginning rather than \
the min_unpersisted"
);
}
if b.data.num_rows() == 1 {
has_measurement = true;
}
}
}
}
}
if has_measurement {
break;
}
tokio::time::sleep(Duration::from_millis(200)).await;
verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| {
if first_batch.min_sequencer_number == SequenceNumber::new(1) {
panic!(
"initialization did a seek to the beginning rather than \
the min_unpersisted"
);
}
})
.await
.expect("timeout");
.await;
}
#[tokio::test]
@ -720,40 +733,14 @@ mod tests {
let (ingester, sequencer, namespace) =
ingester_test_setup(write_operations, 10, true).await;
// give the writes some time to go through the buffer. Exit once we've verified there's
// data in there
tokio::time::timeout(Duration::from_secs(1), async {
loop {
let mut has_measurement = false;
if let Some(data) = ingester.data.sequencer(sequencer.id) {
if let Some(data) = data.namespace(&namespace.name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot("cpu", "1970-01-01").await {
if let Some(b) = b.first() {
assert_eq!(
b.min_sequencer_number,
SequenceNumber::new(1),
"re-initialization didn't seek to the beginning",
);
if b.data.num_rows() == 1 {
has_measurement = true;
}
}
}
}
}
if has_measurement {
break;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| {
assert_eq!(
first_batch.min_sequencer_number,
SequenceNumber::new(1),
"re-initialization didn't seek to the beginning",
);
})
.await
.expect("timeout");
.await;
}
struct TestIngester {