From 7729494f61fbdaed8d6c9a18b5726ea64b811bb3 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 18 Oct 2022 19:37:12 +0200 Subject: [PATCH] test: write, query & progress API coverage This commit adds a new test that exercises all major external APIs of the ingester: * Writing data via the write buffer * Waiting for data to be readable via the progress API * Querying data and and asserting the contents This should provide basic integration coverage for the Ingester internals. This commit also removes a similar test (though with less coverage) that was tightly coupled to the existing buffering structures. --- ingester/src/handler.rs | 266 ++-------------------------------------- ingester/tests/write.rs | 119 ++++++++++++++++++ 2 files changed, 127 insertions(+), 258 deletions(-) create mode 100644 ingester/tests/write.rs diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 5c3ae23fba..27d378c09d 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -435,11 +435,10 @@ impl Drop for IngestHandlerImpl { mod tests { use std::{num::NonZeroU32, ops::DerefMut}; - use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber}; + use data_types::{Namespace, NamespaceSchema, Sequence, SequenceNumber}; use dml::{DmlMeta, DmlWrite}; use iox_catalog::{mem::MemCatalog, validate_or_insert_schema}; use iox_time::Time; - use metric::{Attributes, Metric, U64Counter, U64Gauge}; use mutable_batch_lp::lines_to_batches; use object_store::memory::InMemory; use test_helpers::maybe_start_logging; @@ -448,179 +447,9 @@ mod tests { use super::*; use crate::data::{partition::SnapshotBatch, table::TableName}; - #[tokio::test] - async fn read_from_write_buffer_write_to_mutable_buffer() { - let ingester = TestIngester::new().await; - - let schema = NamespaceSchema::new( - ingester.namespace.id, - ingester.topic.id, - ingester.query_pool.id, - 100, - ); - let mut txn = ingester.catalog.start_transaction().await.unwrap(); - let ingest_ts1 = Time::from_timestamp_millis(42); - let ingest_ts2 = Time::from_timestamp_millis(1337); - let w1 = DmlWrite::new( - "foo", - lines_to_batches("mem foo=1 10", 0).unwrap(), - Some("1970-01-01".into()), - DmlMeta::sequenced( - Sequence::new(ShardIndex::new(0), SequenceNumber::new(0)), - ingest_ts1, - None, - 50, - ), - ); - let schema = validate_or_insert_schema(w1.tables(), &schema, txn.deref_mut()) - .await - .unwrap() - .unwrap(); - ingester.write_buffer_state.push_write(w1); - let w2 = DmlWrite::new( - "foo", - lines_to_batches("cpu bar=2 20\ncpu bar=3 30", 0).unwrap(), - Some("1970-01-01".into()), - DmlMeta::sequenced( - Sequence::new(ShardIndex::new(0), SequenceNumber::new(7)), - ingest_ts2, - None, - 150, - ), - ); - let _schema = validate_or_insert_schema(w2.tables(), &schema, txn.deref_mut()) - .await - .unwrap() - .unwrap(); - ingester.write_buffer_state.push_write(w2); - let w3 = DmlWrite::new( - "foo", - lines_to_batches("a b=2 200", 0).unwrap(), - Some("1970-01-01".into()), - DmlMeta::sequenced( - Sequence::new(ShardIndex::new(0), SequenceNumber::new(9)), - ingest_ts2, - None, - 150, - ), - ); - let _schema = validate_or_insert_schema(w3.tables(), &schema, txn.deref_mut()) - .await - .unwrap() - .unwrap(); - ingester.write_buffer_state.push_write(w3); - txn.commit().await.unwrap(); - - // give the writes some time to go through the buffer. Exit once we've verified there's - // data in there from both writes. - tokio::time::timeout(Duration::from_secs(2), async { - let ns_name = ingester.namespace.name.into(); - let table_name = TableName::from("a"); - loop { - let mut has_measurement = false; - - if let Some(data) = ingester.ingester.data.shard(ingester.shard.id) { - if let Some(data) = data.namespace(&ns_name) { - // verify there's data in the buffer - if let Some((b, _)) = data.snapshot(&table_name, &"1970-01-01".into()).await - { - if let Some(b) = b.first() { - if b.data.num_rows() > 0 { - has_measurement = true; - } - } - } - } - } - - // and ensure that the shard state was actually updated - let shard = ingester - .catalog - .repositories() - .await - .shards() - .create_or_get(&ingester.topic, ingester.shard_index) - .await - .unwrap(); - - if has_measurement - && shard.min_unpersisted_sequence_number == SequenceNumber::new(9) - { - break; - } - - tokio::time::sleep(Duration::from_millis(200)).await; - } - }) - .await - .expect("timeout"); - - let observation = ingester - .metrics - .get_instrument::>("ingester_op_apply_duration") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ("result", "success"), - ])) - .unwrap() - .fetch(); - let hits = observation.buckets.iter().map(|b| b.count).sum::(); - assert_eq!(hits, 3); - - let observation = ingester - .metrics - .get_instrument::>("ingester_write_buffer_read_bytes") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, 350); - - let observation = ingester - .metrics - .get_instrument::>("ingester_write_buffer_last_sequence_number") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, 9); - - let observation = ingester - .metrics - .get_instrument::>("ingester_write_buffer_sequence_number_lag") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, 0); - - let observation = ingester - .metrics - .get_instrument::>("ingester_write_buffer_last_ingest_ts") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, ingest_ts2.timestamp_nanos() as u64); - } - #[tokio::test] async fn test_shutdown() { - let ingester = TestIngester::new().await.ingester; + let (ingester, _, _) = ingester_test_setup(vec![], 0, true).await; // does not exit w/o shutdown tokio::select! { @@ -638,7 +467,7 @@ mod tests { #[tokio::test] #[should_panic(expected = "Background worker 'bad_task' exited early!")] async fn test_join_task_early_shutdown() { - let mut ingester = TestIngester::new().await.ingester; + let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await; let shutdown_task = tokio::spawn(async { // It does nothing! and stops. @@ -655,7 +484,7 @@ mod tests { #[tokio::test] #[should_panic(expected = "JoinError::Panic")] async fn test_join_task_panic() { - let mut ingester = TestIngester::new().await.ingester; + let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await; let shutdown_task = tokio::spawn(async { panic!("bananas"); @@ -946,7 +775,7 @@ mod tests { #[tokio::test] async fn limits_concurrent_queries() { - let mut ingester = TestIngester::new().await; + let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await; let request = IngesterQueryRequest { namespace: "foo".to_string(), table: "cpu".to_string(), @@ -954,93 +783,14 @@ mod tests { predicate: None, }; - let res = ingester.ingester.query(request.clone()).await.unwrap_err(); + let res = ingester.query(request.clone()).await.unwrap_err(); assert!(matches!( res, crate::querier_handler::Error::NamespaceNotFound { .. } )); - ingester.ingester.request_sem = Semaphore::new(0); - let res = ingester.ingester.query(request).await.unwrap_err(); + ingester.request_sem = Semaphore::new(0); + let res = ingester.query(request).await.unwrap_err(); assert!(matches!(res, crate::querier_handler::Error::RequestLimit)); } - - struct TestIngester { - catalog: Arc, - shard: Shard, - namespace: Namespace, - topic: TopicMetadata, - shard_index: ShardIndex, - query_pool: QueryPool, - metrics: Arc, - write_buffer_state: MockBufferSharedState, - ingester: IngestHandlerImpl, - } - - impl TestIngester { - async fn new() -> Self { - let metrics: Arc = Default::default(); - let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - - let mut txn = catalog.start_transaction().await.unwrap(); - let topic = txn.topics().create_or_get("whatevs").await.unwrap(); - let query_pool = txn.query_pools().create_or_get("whatevs").await.unwrap(); - let shard_index = ShardIndex::new(0); - let namespace = txn - .namespaces() - .create("foo", "inf", topic.id, query_pool.id) - .await - .unwrap(); - let shard = txn - .shards() - .create_or_get(&topic, shard_index) - .await - .unwrap(); - txn.commit().await.unwrap(); - - let mut shard_states = BTreeMap::new(); - shard_states.insert(shard_index, shard); - - let write_buffer_state = - MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(1).unwrap()); - let reading: Arc = - Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()); - let object_store = Arc::new(InMemory::new()); - - let lifecycle_config = LifecycleConfig::new( - 1000000, - 1000, - 1000, - Duration::from_secs(10), - Duration::from_secs(10), - 10000000, - ); - let ingester = IngestHandlerImpl::new( - lifecycle_config, - topic.clone(), - shard_states, - Arc::clone(&catalog), - object_store, - reading, - Arc::new(Executor::new(1)), - Arc::clone(&metrics), - false, - 1, - ) - .await - .unwrap(); - - Self { - catalog, - shard, - namespace, - topic, - shard_index, - query_pool, - metrics, - write_buffer_state, - ingester, - } - } - } } diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs new file mode 100644 index 0000000000..243cb75ea2 --- /dev/null +++ b/ingester/tests/write.rs @@ -0,0 +1,119 @@ +mod common; + +use arrow_util::assert_batches_sorted_eq; +pub use common::*; +use data_types::PartitionKey; +use generated_types::ingester::IngesterQueryRequest; +use iox_time::{SystemProvider, TimeProvider}; +use metric::{DurationHistogram, U64Counter, U64Gauge}; + +// Write data to an ingester through the write buffer interface, utilise the +// progress API to wait for it to become readable, and finally query the data +// and validate the contents. +#[tokio::test] +async fn test_write_query() { + let mut ctx = TestContext::new().await; + + ctx.ensure_namespace("test_namespace").await; + + // Initial write + let partition_key = PartitionKey::from("1970-01-01"); + ctx.write_lp( + "test_namespace", + "bananas greatness=\"unbounded\" 10", + partition_key.clone(), + 0, + ) + .await; + + // A subsequent write with a non-contiguous sequence number to a different table. + ctx.write_lp( + "test_namespace", + "cpu bar=2 20\ncpu bar=3 30", + partition_key.clone(), + 7, + ) + .await; + + // And a third write that appends more data to the table in the initial + // write. + let offset = ctx + .write_lp( + "test_namespace", + "bananas count=42 200", + partition_key.clone(), + 42, + ) + .await; + + ctx.wait_for_readable(offset).await; + + // Perform a query to validate the actual data buffered. + let data = ctx + .query(IngesterQueryRequest { + namespace: "test_namespace".to_string(), + table: "bananas".to_string(), + columns: vec![], + predicate: None, + }) + .await + .expect("query should succeed") + .into_record_batches() + .await; + + let expected = vec![ + "+-------+-----------+--------------------------------+", + "| count | greatness | time |", + "+-------+-----------+--------------------------------+", + "| | unbounded | 1970-01-01T00:00:00.000000010Z |", + "| 42 | | 1970-01-01T00:00:00.000000200Z |", + "+-------+-----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); + + // Assert various ingest metrics. + let hist = ctx + .get_metric::( + "ingester_op_apply_duration", + &[ + ("kafka_topic", TEST_TOPIC_NAME), + ("kafka_partition", "0"), + ("result", "success"), + ], + ) + .fetch(); + assert_eq!(hist.sample_count(), 3); + + let metric = ctx + .get_metric::( + "ingester_write_buffer_read_bytes", + &[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")], + ) + .fetch(); + assert_eq!(metric, 150); + + let metric = ctx + .get_metric::( + "ingester_write_buffer_last_sequence_number", + &[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")], + ) + .fetch(); + assert_eq!(metric, 42); + + let metric = ctx + .get_metric::( + "ingester_write_buffer_sequence_number_lag", + &[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")], + ) + .fetch(); + assert_eq!(metric, 0); + + let metric = ctx + .get_metric::( + "ingester_write_buffer_last_ingest_ts", + &[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")], + ) + .fetch(); + let now = SystemProvider::new().now(); + assert!(metric < now.timestamp_nanos() as _); +}