test: write, query & progress API coverage

This commit adds a new test that exercises all major external APIs of
the ingester:

    * Writing data via the write buffer
    * Waiting for data to be readable via the progress API
    * Querying data and and asserting the contents

This should provide basic integration coverage for the Ingester
internals. This commit also removes a similar test (though with less
coverage) that was tightly coupled to the existing buffering structures.
pull/24376/head
Dom Dwyer 2022-10-18 19:37:12 +02:00
parent b12d472a17
commit 7729494f61
2 changed files with 127 additions and 258 deletions

View File

@ -435,11 +435,10 @@ impl<T> Drop for IngestHandlerImpl<T> {
mod tests {
use std::{num::NonZeroU32, ops::DerefMut};
use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber};
use data_types::{Namespace, NamespaceSchema, Sequence, SequenceNumber};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
use iox_time::Time;
use metric::{Attributes, Metric, U64Counter, U64Gauge};
use mutable_batch_lp::lines_to_batches;
use object_store::memory::InMemory;
use test_helpers::maybe_start_logging;
@ -448,179 +447,9 @@ mod tests {
use super::*;
use crate::data::{partition::SnapshotBatch, table::TableName};
#[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() {
let ingester = TestIngester::new().await;
let schema = NamespaceSchema::new(
ingester.namespace.id,
ingester.topic.id,
ingester.query_pool.id,
100,
);
let mut txn = ingester.catalog.start_transaction().await.unwrap();
let ingest_ts1 = Time::from_timestamp_millis(42);
let ingest_ts2 = Time::from_timestamp_millis(1337);
let w1 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
Some("1970-01-01".into()),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(0), SequenceNumber::new(0)),
ingest_ts1,
None,
50,
),
);
let schema = validate_or_insert_schema(w1.tables(), &schema, txn.deref_mut())
.await
.unwrap()
.unwrap();
ingester.write_buffer_state.push_write(w1);
let w2 = DmlWrite::new(
"foo",
lines_to_batches("cpu bar=2 20\ncpu bar=3 30", 0).unwrap(),
Some("1970-01-01".into()),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(0), SequenceNumber::new(7)),
ingest_ts2,
None,
150,
),
);
let _schema = validate_or_insert_schema(w2.tables(), &schema, txn.deref_mut())
.await
.unwrap()
.unwrap();
ingester.write_buffer_state.push_write(w2);
let w3 = DmlWrite::new(
"foo",
lines_to_batches("a b=2 200", 0).unwrap(),
Some("1970-01-01".into()),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(0), SequenceNumber::new(9)),
ingest_ts2,
None,
150,
),
);
let _schema = validate_or_insert_schema(w3.tables(), &schema, txn.deref_mut())
.await
.unwrap()
.unwrap();
ingester.write_buffer_state.push_write(w3);
txn.commit().await.unwrap();
// give the writes some time to go through the buffer. Exit once we've verified there's
// data in there from both writes.
tokio::time::timeout(Duration::from_secs(2), async {
let ns_name = ingester.namespace.name.into();
let table_name = TableName::from("a");
loop {
let mut has_measurement = false;
if let Some(data) = ingester.ingester.data.shard(ingester.shard.id) {
if let Some(data) = data.namespace(&ns_name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot(&table_name, &"1970-01-01".into()).await
{
if let Some(b) = b.first() {
if b.data.num_rows() > 0 {
has_measurement = true;
}
}
}
}
}
// and ensure that the shard state was actually updated
let shard = ingester
.catalog
.repositories()
.await
.shards()
.create_or_get(&ingester.topic, ingester.shard_index)
.await
.unwrap();
if has_measurement
&& shard.min_unpersisted_sequence_number == SequenceNumber::new(9)
{
break;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
})
.await
.expect("timeout");
let observation = ingester
.metrics
.get_instrument::<Metric<DurationHistogram>>("ingester_op_apply_duration")
.unwrap()
.get_observer(&Attributes::from(&[
("kafka_topic", "whatevs"),
("kafka_partition", "0"),
("result", "success"),
]))
.unwrap()
.fetch();
let hits = observation.buckets.iter().map(|b| b.count).sum::<u64>();
assert_eq!(hits, 3);
let observation = ingester
.metrics
.get_instrument::<Metric<U64Counter>>("ingester_write_buffer_read_bytes")
.unwrap()
.get_observer(&Attributes::from(&[
("kafka_topic", "whatevs"),
("kafka_partition", "0"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 350);
let observation = ingester
.metrics
.get_instrument::<Metric<U64Gauge>>("ingester_write_buffer_last_sequence_number")
.unwrap()
.get_observer(&Attributes::from(&[
("kafka_topic", "whatevs"),
("kafka_partition", "0"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 9);
let observation = ingester
.metrics
.get_instrument::<Metric<U64Gauge>>("ingester_write_buffer_sequence_number_lag")
.unwrap()
.get_observer(&Attributes::from(&[
("kafka_topic", "whatevs"),
("kafka_partition", "0"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 0);
let observation = ingester
.metrics
.get_instrument::<Metric<U64Gauge>>("ingester_write_buffer_last_ingest_ts")
.unwrap()
.get_observer(&Attributes::from(&[
("kafka_topic", "whatevs"),
("kafka_partition", "0"),
]))
.unwrap()
.fetch();
assert_eq!(observation, ingest_ts2.timestamp_nanos() as u64);
}
#[tokio::test]
async fn test_shutdown() {
let ingester = TestIngester::new().await.ingester;
let (ingester, _, _) = ingester_test_setup(vec![], 0, true).await;
// does not exit w/o shutdown
tokio::select! {
@ -638,7 +467,7 @@ mod tests {
#[tokio::test]
#[should_panic(expected = "Background worker 'bad_task' exited early!")]
async fn test_join_task_early_shutdown() {
let mut ingester = TestIngester::new().await.ingester;
let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await;
let shutdown_task = tokio::spawn(async {
// It does nothing! and stops.
@ -655,7 +484,7 @@ mod tests {
#[tokio::test]
#[should_panic(expected = "JoinError::Panic")]
async fn test_join_task_panic() {
let mut ingester = TestIngester::new().await.ingester;
let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await;
let shutdown_task = tokio::spawn(async {
panic!("bananas");
@ -946,7 +775,7 @@ mod tests {
#[tokio::test]
async fn limits_concurrent_queries() {
let mut ingester = TestIngester::new().await;
let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await;
let request = IngesterQueryRequest {
namespace: "foo".to_string(),
table: "cpu".to_string(),
@ -954,93 +783,14 @@ mod tests {
predicate: None,
};
let res = ingester.ingester.query(request.clone()).await.unwrap_err();
let res = ingester.query(request.clone()).await.unwrap_err();
assert!(matches!(
res,
crate::querier_handler::Error::NamespaceNotFound { .. }
));
ingester.ingester.request_sem = Semaphore::new(0);
let res = ingester.ingester.query(request).await.unwrap_err();
ingester.request_sem = Semaphore::new(0);
let res = ingester.query(request).await.unwrap_err();
assert!(matches!(res, crate::querier_handler::Error::RequestLimit));
}
struct TestIngester {
catalog: Arc<dyn Catalog>,
shard: Shard,
namespace: Namespace,
topic: TopicMetadata,
shard_index: ShardIndex,
query_pool: QueryPool,
metrics: Arc<metric::Registry>,
write_buffer_state: MockBufferSharedState,
ingester: IngestHandlerImpl,
}
impl TestIngester {
async fn new() -> Self {
let metrics: Arc<metric::Registry> = Default::default();
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let mut txn = catalog.start_transaction().await.unwrap();
let topic = txn.topics().create_or_get("whatevs").await.unwrap();
let query_pool = txn.query_pools().create_or_get("whatevs").await.unwrap();
let shard_index = ShardIndex::new(0);
let namespace = txn
.namespaces()
.create("foo", "inf", topic.id, query_pool.id)
.await
.unwrap();
let shard = txn
.shards()
.create_or_get(&topic, shard_index)
.await
.unwrap();
txn.commit().await.unwrap();
let mut shard_states = BTreeMap::new();
shard_states.insert(shard_index, shard);
let write_buffer_state =
MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(1).unwrap());
let reading: Arc<dyn WriteBufferReading> =
Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap());
let object_store = Arc::new(InMemory::new());
let lifecycle_config = LifecycleConfig::new(
1000000,
1000,
1000,
Duration::from_secs(10),
Duration::from_secs(10),
10000000,
);
let ingester = IngestHandlerImpl::new(
lifecycle_config,
topic.clone(),
shard_states,
Arc::clone(&catalog),
object_store,
reading,
Arc::new(Executor::new(1)),
Arc::clone(&metrics),
false,
1,
)
.await
.unwrap();
Self {
catalog,
shard,
namespace,
topic,
shard_index,
query_pool,
metrics,
write_buffer_state,
ingester,
}
}
}
}

119
ingester/tests/write.rs Normal file
View File

@ -0,0 +1,119 @@
mod common;
use arrow_util::assert_batches_sorted_eq;
pub use common::*;
use data_types::PartitionKey;
use generated_types::ingester::IngesterQueryRequest;
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, U64Counter, U64Gauge};
// Write data to an ingester through the write buffer interface, utilise the
// progress API to wait for it to become readable, and finally query the data
// and validate the contents.
#[tokio::test]
async fn test_write_query() {
let mut ctx = TestContext::new().await;
ctx.ensure_namespace("test_namespace").await;
// Initial write
let partition_key = PartitionKey::from("1970-01-01");
ctx.write_lp(
"test_namespace",
"bananas greatness=\"unbounded\" 10",
partition_key.clone(),
0,
)
.await;
// A subsequent write with a non-contiguous sequence number to a different table.
ctx.write_lp(
"test_namespace",
"cpu bar=2 20\ncpu bar=3 30",
partition_key.clone(),
7,
)
.await;
// And a third write that appends more data to the table in the initial
// write.
let offset = ctx
.write_lp(
"test_namespace",
"bananas count=42 200",
partition_key.clone(),
42,
)
.await;
ctx.wait_for_readable(offset).await;
// Perform a query to validate the actual data buffered.
let data = ctx
.query(IngesterQueryRequest {
namespace: "test_namespace".to_string(),
table: "bananas".to_string(),
columns: vec![],
predicate: None,
})
.await
.expect("query should succeed")
.into_record_batches()
.await;
let expected = vec![
"+-------+-----------+--------------------------------+",
"| count | greatness | time |",
"+-------+-----------+--------------------------------+",
"| | unbounded | 1970-01-01T00:00:00.000000010Z |",
"| 42 | | 1970-01-01T00:00:00.000000200Z |",
"+-------+-----------+--------------------------------+",
];
assert_batches_sorted_eq!(&expected, &data);
// Assert various ingest metrics.
let hist = ctx
.get_metric::<DurationHistogram, _>(
"ingester_op_apply_duration",
&[
("kafka_topic", TEST_TOPIC_NAME),
("kafka_partition", "0"),
("result", "success"),
],
)
.fetch();
assert_eq!(hist.sample_count(), 3);
let metric = ctx
.get_metric::<U64Counter, _>(
"ingester_write_buffer_read_bytes",
&[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")],
)
.fetch();
assert_eq!(metric, 150);
let metric = ctx
.get_metric::<U64Gauge, _>(
"ingester_write_buffer_last_sequence_number",
&[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")],
)
.fetch();
assert_eq!(metric, 42);
let metric = ctx
.get_metric::<U64Gauge, _>(
"ingester_write_buffer_sequence_number_lag",
&[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")],
)
.fetch();
assert_eq!(metric, 0);
let metric = ctx
.get_metric::<U64Gauge, _>(
"ingester_write_buffer_last_ingest_ts",
&[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")],
)
.fetch();
let now = SystemProvider::new().now();
assert!(metric < now.timestamp_nanos() as _);
}