From d0b546109fe6da1132e2dc095511191d38a7eb4c Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 18 Oct 2022 18:05:34 +0200 Subject: [PATCH 1/5] refactor: impl converting IngesterQueryResponse An existing function to map the complex IngesterQueryResponse type to a simple set of RecordBatch existed in test code - this has been lifted onto an inherent method on the response type itself for reuse. --- ingester/src/querier_handler.rs | 142 ++++++++++++++++---------------- 1 file changed, 73 insertions(+), 69 deletions(-) diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index b1f2338a1c..e9e13e72f5 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -2,15 +2,15 @@ use std::{pin::Pin, sync::Arc}; -use arrow::{error::ArrowError, record_batch::RecordBatch}; +use arrow::{array::new_null_array, error::ArrowError, record_batch::RecordBatch}; use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use data_types::{PartitionId, SequenceNumber}; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::MemoryStream; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use generated_types::ingester::IngesterQueryRequest; use observability_deps::tracing::debug; -use schema::selection::Selection; +use schema::{merge::SchemaMerger, selection::Selection}; use snafu::{ensure, Snafu}; use crate::{ @@ -168,10 +168,63 @@ impl IngesterQueryResponse { }) .boxed() } + + /// Convert [`IngesterQueryResponse`] to a set of [`RecordBatch`]es. + /// + /// If the response contains multiple snapshots, this will merge the schemas into a single one and create + /// NULL-columns for snapshots that miss columns. + /// + /// # Panic + /// Panics if there are no batches returned at all. Also panics if the snapshot-scoped schemas do not line up with + /// the snapshot-scoped record batches. + pub async fn into_record_batches(self) -> Vec { + let mut snapshot_schema = None; + let mut schema_merger = SchemaMerger::new(); + let mut batches = vec![]; + + let mut stream = self.flatten(); + while let Some(msg) = stream.try_next().await.unwrap() { + match msg { + FlatIngesterQueryResponse::StartPartition { .. } => (), + FlatIngesterQueryResponse::RecordBatch { batch } => { + let last_schema = snapshot_schema.as_ref().unwrap(); + assert_eq!(&batch.schema(), last_schema); + batches.push(batch); + } + FlatIngesterQueryResponse::StartSnapshot { schema } => { + snapshot_schema = Some(Arc::clone(&schema)); + + schema_merger = schema_merger + .merge(&schema::Schema::try_from(schema).unwrap()) + .unwrap(); + } + } + } + + assert!(!batches.is_empty()); + + // equalize schemas + let common_schema = schema_merger.build().as_arrow(); + batches + .into_iter() + .map(|batch| { + let batch_schema = batch.schema(); + let columns = common_schema + .fields() + .iter() + .map(|field| match batch_schema.index_of(field.name()) { + Ok(idx) => Arc::clone(batch.column(idx)), + Err(_) => new_null_array(field.data_type(), batch.num_rows()), + }) + .collect(); + RecordBatch::try_new(Arc::clone(&common_schema), columns).unwrap() + }) + .collect() + } } /// Flattened version of [`IngesterQueryResponse`]. -pub(crate) type FlatIngesterQueryResponseStream = +pub type FlatIngesterQueryResponseStream = Pin> + Send>>; /// Element within the flat wire protocol. @@ -347,17 +400,15 @@ fn prepare_data_to_querier_for_partition( mod tests { use std::task::{Context, Poll}; - use arrow::{array::new_null_array, datatypes::SchemaRef, record_batch::RecordBatch}; + use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use arrow_util::assert_batches_sorted_eq; use assert_matches::assert_matches; use datafusion::{ physical_plan::RecordBatchStream, prelude::{col, lit}, }; - use futures::TryStreamExt; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use predicate::Predicate; - use schema::merge::SchemaMerger; use super::*; use crate::test_util::{make_ingester_data, DataLocation, TEST_NAMESPACE, TEST_TABLE}; @@ -490,8 +541,11 @@ mod tests { ]; for (loc, scenario) in &scenarios { println!("Location: {loc:?}"); - let stream = prepare_data_to_querier(scenario, &request).await.unwrap(); - let result = ingester_response_to_record_batches(stream).await; + let result = prepare_data_to_querier(scenario, &request) + .await + .unwrap() + .into_record_batches() + .await; assert_batches_sorted_eq!(&expected, &result); } @@ -523,8 +577,11 @@ mod tests { ]; for (loc, scenario) in &scenarios { println!("Location: {loc:?}"); - let stream = prepare_data_to_querier(scenario, &request).await.unwrap(); - let result = ingester_response_to_record_batches(stream).await; + let result = prepare_data_to_querier(scenario, &request) + .await + .unwrap() + .into_record_batches() + .await; assert_batches_sorted_eq!(&expected, &result); } @@ -565,8 +622,11 @@ mod tests { ]; for (loc, scenario) in &scenarios { println!("Location: {loc:?}"); - let stream = prepare_data_to_querier(scenario, &request).await.unwrap(); - let result = ingester_response_to_record_batches(stream).await; + let result = prepare_data_to_querier(scenario, &request) + .await + .unwrap() + .into_record_batches() + .await; assert_batches_sorted_eq!(&expected, &result); } @@ -640,60 +700,4 @@ mod tests { fn lp_to_batch(lp: &str) -> RecordBatch { lp_to_mutable_batch(lp).1.to_arrow(Selection::All).unwrap() } - - /// Convert [`IngesterQueryResponse`] to a set of [`RecordBatch`]es. - /// - /// If the response contains multiple snapshots, this will merge the schemas into a single one and create - /// NULL-columns for snapshots that miss columns. This makes it easier to use the resulting batches with - /// [`assert_batches_sorted_eq`]. - /// - /// # Panic - /// Panics if there are no batches returned at all. Also panics if the snapshot-scoped schemas do not line up with - /// the snapshot-scoped record batches. - async fn ingester_response_to_record_batches( - response: IngesterQueryResponse, - ) -> Vec { - let mut snapshot_schema = None; - let mut schema_merger = SchemaMerger::new(); - let mut batches = vec![]; - - let mut stream = response.flatten(); - while let Some(msg) = stream.try_next().await.unwrap() { - match msg { - FlatIngesterQueryResponse::StartPartition { .. } => (), - FlatIngesterQueryResponse::RecordBatch { batch } => { - let last_schema = snapshot_schema.as_ref().unwrap(); - assert_eq!(&batch.schema(), last_schema); - batches.push(batch); - } - FlatIngesterQueryResponse::StartSnapshot { schema } => { - snapshot_schema = Some(Arc::clone(&schema)); - - schema_merger = schema_merger - .merge(&schema::Schema::try_from(schema).unwrap()) - .unwrap(); - } - } - } - - assert!(!batches.is_empty()); - - // equalize schemas - let common_schema = schema_merger.build().as_arrow(); - batches - .into_iter() - .map(|batch| { - let batch_schema = batch.schema(); - let columns = common_schema - .fields() - .iter() - .map(|field| match batch_schema.index_of(field.name()) { - Ok(idx) => Arc::clone(batch.column(idx)), - Err(_) => new_null_array(field.data_type(), batch.num_rows()), - }) - .collect(); - RecordBatch::try_new(Arc::clone(&common_schema), columns).unwrap() - }) - .collect() - } } From b12d472a17a088c4f8475a8a6d1c30e0465bbaf2 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 18 Oct 2022 17:25:18 +0200 Subject: [PATCH 2/5] test(ingester): add integration TestContext Adds a test helper type that maintains the in-memory state for a single ingester integration test, and provides easy-to-use methods to manipulate and inspect the ingester instance. --- ingester/src/lifecycle.rs | 2 +- ingester/tests/common/mod.rs | 350 +++++++++++++++++++++++++++++++++++ 2 files changed, 351 insertions(+), 1 deletion(-) create mode 100644 ingester/tests/common/mod.rs diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index d15389ed60..89cdca01dd 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -201,7 +201,7 @@ pub struct LifecycleConfig { impl LifecycleConfig { /// Initialize a new LifecycleConfig. panics if the passed `pause_ingest_size` is less than the /// `persist_memory_threshold`. - pub fn new( + pub const fn new( pause_ingest_size: usize, persist_memory_threshold: usize, partition_size_threshold: usize, diff --git a/ingester/tests/common/mod.rs b/ingester/tests/common/mod.rs new file mode 100644 index 0000000000..026a40e132 --- /dev/null +++ b/ingester/tests/common/mod.rs @@ -0,0 +1,350 @@ +use std::{collections::HashMap, num::NonZeroU32, sync::Arc, time::Duration}; + +use data_types::{ + Namespace, NamespaceSchema, PartitionKey, QueryPoolId, Sequence, SequenceNumber, ShardId, + ShardIndex, TopicId, +}; +use dml::{DmlMeta, DmlWrite}; +use generated_types::ingester::IngesterQueryRequest; +use ingester::{ + handler::{IngestHandler, IngestHandlerImpl}, + lifecycle::LifecycleConfig, + querier_handler::IngesterQueryResponse, +}; +use iox_catalog::{interface::Catalog, mem::MemCatalog, validate_or_insert_schema}; +use iox_query::exec::Executor; +use iox_time::TimeProvider; +use metric::{Attributes, Metric, MetricObserver}; +use mutable_batch_lp::lines_to_batches; +use object_store::DynObjectStore; +use observability_deps::tracing::*; +use test_helpers::{maybe_start_logging, timeout::FutureTimeout}; +use write_buffer::{ + core::WriteBufferReading, + mock::{MockBufferForReading, MockBufferSharedState}, +}; +use write_summary::ShardProgress; + +/// The byte size of 1 MiB. +const ONE_MIB: usize = 1024 * 1024; + +/// The shard index used for the [`TestContext`]. +pub const TEST_SHARD_INDEX: ShardIndex = ShardIndex::new(0); + +/// The topic name used for tests. +pub const TEST_TOPIC_NAME: &str = "banana-topics"; + +/// The lifecycle configuration used for tests. +pub const TEST_LIFECYCLE_CONFIG: LifecycleConfig = LifecycleConfig::new( + ONE_MIB, + ONE_MIB / 10, + ONE_MIB / 10, + Duration::from_secs(10), + Duration::from_secs(10), + 1_000, +); + +pub struct TestContext { + ingester: IngestHandlerImpl, + + // Catalog data initialised at construction time for later reuse. + query_id: QueryPoolId, + topic_id: TopicId, + shard_id: ShardId, + + // A map of namespaces to schemas, also serving as the set of known + // namespaces. + namespaces: HashMap, + + catalog: Arc, + object_store: Arc, + write_buffer_state: MockBufferSharedState, + metrics: Arc, +} + +impl TestContext { + pub async fn new() -> Self { + maybe_start_logging(); + + let metrics: Arc = Default::default(); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); + + // Initialise a topic, query pool and shard. + // + // Note that tests should set up their own namespace via + // ensure_namespace() + let mut txn = catalog.start_transaction().await.unwrap(); + let topic = txn.topics().create_or_get(TEST_TOPIC_NAME).await.unwrap(); + let query_id = txn + .query_pools() + .create_or_get("banana-query-pool") + .await + .unwrap() + .id; + let shard = txn + .shards() + .create_or_get(&topic, TEST_SHARD_INDEX) + .await + .unwrap(); + txn.commit().await.unwrap(); + + // Mock in-memory write buffer. + let write_buffer_state = + MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(1).unwrap()); + let write_buffer_read: Arc = + Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()); + + // Mock object store that persists in memory. + let object_store: Arc = Arc::new(object_store::memory::InMemory::new()); + + let ingester = IngestHandlerImpl::new( + TEST_LIFECYCLE_CONFIG, + topic.clone(), + [(TEST_SHARD_INDEX, shard)].into_iter().collect(), + Arc::clone(&catalog), + Arc::clone(&object_store), + write_buffer_read, + Arc::new(Executor::new(1)), + Arc::clone(&metrics), + true, + 1, + ) + .await + .unwrap(); + + Self { + ingester, + query_id, + topic_id: topic.id, + shard_id: shard.id, + catalog, + object_store, + write_buffer_state, + metrics, + namespaces: Default::default(), + } + } + + /// Restart the Ingester, driving initialisation again. + /// + /// NOTE: metric contents are not reset. + pub async fn restart(&mut self) { + info!("restarting test context ingester"); + + let write_buffer_read: Arc = + Arc::new(MockBufferForReading::new(self.write_buffer_state.clone(), None).unwrap()); + + let topic = self + .catalog + .repositories() + .await + .topics() + .create_or_get(TEST_TOPIC_NAME) + .await + .unwrap(); + + let shard = self + .catalog + .repositories() + .await + .shards() + .create_or_get(&topic, TEST_SHARD_INDEX) + .await + .unwrap(); + + self.ingester = IngestHandlerImpl::new( + TEST_LIFECYCLE_CONFIG, + topic, + [(TEST_SHARD_INDEX, shard)].into_iter().collect(), + Arc::clone(&self.catalog), + Arc::clone(&self.object_store), + write_buffer_read, + Arc::new(Executor::new(1)), + Arc::clone(&self.metrics), + true, + 1, + ) + .await + .unwrap(); + } + + /// Create a namespace in the catalog for the ingester to discover. + /// + /// # Panics + /// + /// Must not be called twice with the same `name`. + #[track_caller] + pub async fn ensure_namespace(&mut self, name: &str) -> Namespace { + let ns = self + .catalog + .repositories() + .await + .namespaces() + .create( + name, + iox_catalog::INFINITE_RETENTION_POLICY, + self.topic_id, + self.query_id, + ) + .await + .expect("failed to create test namespace"); + + assert!( + self.namespaces + .insert( + name.to_owned(), + NamespaceSchema::new( + ns.id, + self.topic_id, + self.query_id, + iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, + ), + ) + .is_none(), + "namespace must not be duplicated" + ); + + debug!(?ns, "test namespace created"); + + ns + } + + /// Enqueue the specified `op` into the write buffer for the ingester to + /// consume. + /// + /// This call takes care of validating the schema of `op` and populating the + /// catalog with any new schema elements. + /// + /// # Panics + /// + /// This method panics if the namespace for `op` does not exist, or the + /// schema is invalid or conflicts with the existing namespace schema. + #[track_caller] + pub async fn enqueue_write(&mut self, op: DmlWrite) -> SequenceNumber { + let schema = self + .namespaces + .get_mut(op.namespace()) + .expect("namespace does not exist"); + + // Pull the sequence number out of the op to return it back to the user + // for simplicity. + let offset = op + .meta() + .sequence() + .expect("write must be sequenced") + .sequence_number; + + // Perform schema validation, populating the catalog. + let mut repo = self.catalog.repositories().await; + if let Some(new) = validate_or_insert_schema(op.tables(), schema, repo.as_mut()) + .await + .expect("failed schema validation for enqueuing write") + { + // Retain the updated schema. + debug!(?schema, "updated test context schema"); + *schema = new; + } + + // Push the write into the write buffer. + self.write_buffer_state.push_write(op); + + debug!(?offset, "enqueued write in write buffer"); + offset + } + + /// A helper wrapper over [`Self::enqueue_write()`] for line-protocol. + #[track_caller] + pub async fn write_lp( + &mut self, + namespace: &str, + lp: &str, + partition_key: PartitionKey, + sequence_number: i64, + ) -> SequenceNumber { + self.enqueue_write(DmlWrite::new( + namespace, + lines_to_batches(lp, 0).unwrap(), + Some(partition_key), + DmlMeta::sequenced( + Sequence::new(TEST_SHARD_INDEX, SequenceNumber::new(sequence_number)), + iox_time::SystemProvider::new().now(), + None, + 50, + ), + )) + .await + } + + /// Utilise the progress API to query for the current state of the test + /// shard. + pub async fn progress(&self) -> ShardProgress { + self.ingester + .progresses(vec![TEST_SHARD_INDEX]) + .await + .get(&TEST_SHARD_INDEX) + .unwrap() + .clone() + } + + /// Wait for the specified `offset` to be readable according to the external + /// progress API. + /// + /// # Panics + /// + /// This method panics if `offset` is not readable within 10 seconds. + pub async fn wait_for_readable(&self, offset: SequenceNumber) { + async { + loop { + let is_readable = self.progress().await.readable(offset); + if is_readable { + debug!(?offset, "offset reported as readable"); + return; + } + + trace!(?offset, "offset reported as not yet readable"); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + .with_timeout_panic(Duration::from_secs(10)) + .await; + } + + /// Submit a query to the ingester's public query interface. + pub async fn query( + &self, + req: IngesterQueryRequest, + ) -> Result { + self.ingester.query(req).await + } + + /// Retrieve the specified metric value. + pub fn get_metric(&self, name: &'static str, attrs: A) -> T::Recorder + where + T: MetricObserver, + A: Into, + { + let attrs = attrs.into(); + + self.metrics + .get_instrument::>(name) + .unwrap_or_else(|| panic!("failed to find metric {}", name)) + .get_observer(&attrs) + .unwrap_or_else(|| { + panic!( + "failed to find metric {} with attributes {:?}", + name, &attrs + ) + }) + .recorder() + } + + /// Return a reference to the catalog. + pub fn catalog(&self) -> &dyn Catalog { + self.catalog.as_ref() + } + + /// Return the [`ShardId`] of the test shard. + pub fn shard_id(&self) -> ShardId { + self.shard_id + } +} From 7729494f61fbdaed8d6c9a18b5726ea64b811bb3 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 18 Oct 2022 19:37:12 +0200 Subject: [PATCH 3/5] test: write, query & progress API coverage This commit adds a new test that exercises all major external APIs of the ingester: * Writing data via the write buffer * Waiting for data to be readable via the progress API * Querying data and and asserting the contents This should provide basic integration coverage for the Ingester internals. This commit also removes a similar test (though with less coverage) that was tightly coupled to the existing buffering structures. --- ingester/src/handler.rs | 266 ++-------------------------------------- ingester/tests/write.rs | 119 ++++++++++++++++++ 2 files changed, 127 insertions(+), 258 deletions(-) create mode 100644 ingester/tests/write.rs diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 5c3ae23fba..27d378c09d 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -435,11 +435,10 @@ impl Drop for IngestHandlerImpl { mod tests { use std::{num::NonZeroU32, ops::DerefMut}; - use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber}; + use data_types::{Namespace, NamespaceSchema, Sequence, SequenceNumber}; use dml::{DmlMeta, DmlWrite}; use iox_catalog::{mem::MemCatalog, validate_or_insert_schema}; use iox_time::Time; - use metric::{Attributes, Metric, U64Counter, U64Gauge}; use mutable_batch_lp::lines_to_batches; use object_store::memory::InMemory; use test_helpers::maybe_start_logging; @@ -448,179 +447,9 @@ mod tests { use super::*; use crate::data::{partition::SnapshotBatch, table::TableName}; - #[tokio::test] - async fn read_from_write_buffer_write_to_mutable_buffer() { - let ingester = TestIngester::new().await; - - let schema = NamespaceSchema::new( - ingester.namespace.id, - ingester.topic.id, - ingester.query_pool.id, - 100, - ); - let mut txn = ingester.catalog.start_transaction().await.unwrap(); - let ingest_ts1 = Time::from_timestamp_millis(42); - let ingest_ts2 = Time::from_timestamp_millis(1337); - let w1 = DmlWrite::new( - "foo", - lines_to_batches("mem foo=1 10", 0).unwrap(), - Some("1970-01-01".into()), - DmlMeta::sequenced( - Sequence::new(ShardIndex::new(0), SequenceNumber::new(0)), - ingest_ts1, - None, - 50, - ), - ); - let schema = validate_or_insert_schema(w1.tables(), &schema, txn.deref_mut()) - .await - .unwrap() - .unwrap(); - ingester.write_buffer_state.push_write(w1); - let w2 = DmlWrite::new( - "foo", - lines_to_batches("cpu bar=2 20\ncpu bar=3 30", 0).unwrap(), - Some("1970-01-01".into()), - DmlMeta::sequenced( - Sequence::new(ShardIndex::new(0), SequenceNumber::new(7)), - ingest_ts2, - None, - 150, - ), - ); - let _schema = validate_or_insert_schema(w2.tables(), &schema, txn.deref_mut()) - .await - .unwrap() - .unwrap(); - ingester.write_buffer_state.push_write(w2); - let w3 = DmlWrite::new( - "foo", - lines_to_batches("a b=2 200", 0).unwrap(), - Some("1970-01-01".into()), - DmlMeta::sequenced( - Sequence::new(ShardIndex::new(0), SequenceNumber::new(9)), - ingest_ts2, - None, - 150, - ), - ); - let _schema = validate_or_insert_schema(w3.tables(), &schema, txn.deref_mut()) - .await - .unwrap() - .unwrap(); - ingester.write_buffer_state.push_write(w3); - txn.commit().await.unwrap(); - - // give the writes some time to go through the buffer. Exit once we've verified there's - // data in there from both writes. - tokio::time::timeout(Duration::from_secs(2), async { - let ns_name = ingester.namespace.name.into(); - let table_name = TableName::from("a"); - loop { - let mut has_measurement = false; - - if let Some(data) = ingester.ingester.data.shard(ingester.shard.id) { - if let Some(data) = data.namespace(&ns_name) { - // verify there's data in the buffer - if let Some((b, _)) = data.snapshot(&table_name, &"1970-01-01".into()).await - { - if let Some(b) = b.first() { - if b.data.num_rows() > 0 { - has_measurement = true; - } - } - } - } - } - - // and ensure that the shard state was actually updated - let shard = ingester - .catalog - .repositories() - .await - .shards() - .create_or_get(&ingester.topic, ingester.shard_index) - .await - .unwrap(); - - if has_measurement - && shard.min_unpersisted_sequence_number == SequenceNumber::new(9) - { - break; - } - - tokio::time::sleep(Duration::from_millis(200)).await; - } - }) - .await - .expect("timeout"); - - let observation = ingester - .metrics - .get_instrument::>("ingester_op_apply_duration") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ("result", "success"), - ])) - .unwrap() - .fetch(); - let hits = observation.buckets.iter().map(|b| b.count).sum::(); - assert_eq!(hits, 3); - - let observation = ingester - .metrics - .get_instrument::>("ingester_write_buffer_read_bytes") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, 350); - - let observation = ingester - .metrics - .get_instrument::>("ingester_write_buffer_last_sequence_number") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, 9); - - let observation = ingester - .metrics - .get_instrument::>("ingester_write_buffer_sequence_number_lag") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, 0); - - let observation = ingester - .metrics - .get_instrument::>("ingester_write_buffer_last_ingest_ts") - .unwrap() - .get_observer(&Attributes::from(&[ - ("kafka_topic", "whatevs"), - ("kafka_partition", "0"), - ])) - .unwrap() - .fetch(); - assert_eq!(observation, ingest_ts2.timestamp_nanos() as u64); - } - #[tokio::test] async fn test_shutdown() { - let ingester = TestIngester::new().await.ingester; + let (ingester, _, _) = ingester_test_setup(vec![], 0, true).await; // does not exit w/o shutdown tokio::select! { @@ -638,7 +467,7 @@ mod tests { #[tokio::test] #[should_panic(expected = "Background worker 'bad_task' exited early!")] async fn test_join_task_early_shutdown() { - let mut ingester = TestIngester::new().await.ingester; + let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await; let shutdown_task = tokio::spawn(async { // It does nothing! and stops. @@ -655,7 +484,7 @@ mod tests { #[tokio::test] #[should_panic(expected = "JoinError::Panic")] async fn test_join_task_panic() { - let mut ingester = TestIngester::new().await.ingester; + let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await; let shutdown_task = tokio::spawn(async { panic!("bananas"); @@ -946,7 +775,7 @@ mod tests { #[tokio::test] async fn limits_concurrent_queries() { - let mut ingester = TestIngester::new().await; + let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await; let request = IngesterQueryRequest { namespace: "foo".to_string(), table: "cpu".to_string(), @@ -954,93 +783,14 @@ mod tests { predicate: None, }; - let res = ingester.ingester.query(request.clone()).await.unwrap_err(); + let res = ingester.query(request.clone()).await.unwrap_err(); assert!(matches!( res, crate::querier_handler::Error::NamespaceNotFound { .. } )); - ingester.ingester.request_sem = Semaphore::new(0); - let res = ingester.ingester.query(request).await.unwrap_err(); + ingester.request_sem = Semaphore::new(0); + let res = ingester.query(request).await.unwrap_err(); assert!(matches!(res, crate::querier_handler::Error::RequestLimit)); } - - struct TestIngester { - catalog: Arc, - shard: Shard, - namespace: Namespace, - topic: TopicMetadata, - shard_index: ShardIndex, - query_pool: QueryPool, - metrics: Arc, - write_buffer_state: MockBufferSharedState, - ingester: IngestHandlerImpl, - } - - impl TestIngester { - async fn new() -> Self { - let metrics: Arc = Default::default(); - let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - - let mut txn = catalog.start_transaction().await.unwrap(); - let topic = txn.topics().create_or_get("whatevs").await.unwrap(); - let query_pool = txn.query_pools().create_or_get("whatevs").await.unwrap(); - let shard_index = ShardIndex::new(0); - let namespace = txn - .namespaces() - .create("foo", "inf", topic.id, query_pool.id) - .await - .unwrap(); - let shard = txn - .shards() - .create_or_get(&topic, shard_index) - .await - .unwrap(); - txn.commit().await.unwrap(); - - let mut shard_states = BTreeMap::new(); - shard_states.insert(shard_index, shard); - - let write_buffer_state = - MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(1).unwrap()); - let reading: Arc = - Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()); - let object_store = Arc::new(InMemory::new()); - - let lifecycle_config = LifecycleConfig::new( - 1000000, - 1000, - 1000, - Duration::from_secs(10), - Duration::from_secs(10), - 10000000, - ); - let ingester = IngestHandlerImpl::new( - lifecycle_config, - topic.clone(), - shard_states, - Arc::clone(&catalog), - object_store, - reading, - Arc::new(Executor::new(1)), - Arc::clone(&metrics), - false, - 1, - ) - .await - .unwrap(); - - Self { - catalog, - shard, - namespace, - topic, - shard_index, - query_pool, - metrics, - write_buffer_state, - ingester, - } - } - } } diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs new file mode 100644 index 0000000000..243cb75ea2 --- /dev/null +++ b/ingester/tests/write.rs @@ -0,0 +1,119 @@ +mod common; + +use arrow_util::assert_batches_sorted_eq; +pub use common::*; +use data_types::PartitionKey; +use generated_types::ingester::IngesterQueryRequest; +use iox_time::{SystemProvider, TimeProvider}; +use metric::{DurationHistogram, U64Counter, U64Gauge}; + +// Write data to an ingester through the write buffer interface, utilise the +// progress API to wait for it to become readable, and finally query the data +// and validate the contents. +#[tokio::test] +async fn test_write_query() { + let mut ctx = TestContext::new().await; + + ctx.ensure_namespace("test_namespace").await; + + // Initial write + let partition_key = PartitionKey::from("1970-01-01"); + ctx.write_lp( + "test_namespace", + "bananas greatness=\"unbounded\" 10", + partition_key.clone(), + 0, + ) + .await; + + // A subsequent write with a non-contiguous sequence number to a different table. + ctx.write_lp( + "test_namespace", + "cpu bar=2 20\ncpu bar=3 30", + partition_key.clone(), + 7, + ) + .await; + + // And a third write that appends more data to the table in the initial + // write. + let offset = ctx + .write_lp( + "test_namespace", + "bananas count=42 200", + partition_key.clone(), + 42, + ) + .await; + + ctx.wait_for_readable(offset).await; + + // Perform a query to validate the actual data buffered. + let data = ctx + .query(IngesterQueryRequest { + namespace: "test_namespace".to_string(), + table: "bananas".to_string(), + columns: vec![], + predicate: None, + }) + .await + .expect("query should succeed") + .into_record_batches() + .await; + + let expected = vec![ + "+-------+-----------+--------------------------------+", + "| count | greatness | time |", + "+-------+-----------+--------------------------------+", + "| | unbounded | 1970-01-01T00:00:00.000000010Z |", + "| 42 | | 1970-01-01T00:00:00.000000200Z |", + "+-------+-----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); + + // Assert various ingest metrics. + let hist = ctx + .get_metric::( + "ingester_op_apply_duration", + &[ + ("kafka_topic", TEST_TOPIC_NAME), + ("kafka_partition", "0"), + ("result", "success"), + ], + ) + .fetch(); + assert_eq!(hist.sample_count(), 3); + + let metric = ctx + .get_metric::( + "ingester_write_buffer_read_bytes", + &[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")], + ) + .fetch(); + assert_eq!(metric, 150); + + let metric = ctx + .get_metric::( + "ingester_write_buffer_last_sequence_number", + &[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")], + ) + .fetch(); + assert_eq!(metric, 42); + + let metric = ctx + .get_metric::( + "ingester_write_buffer_sequence_number_lag", + &[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")], + ) + .fetch(); + assert_eq!(metric, 0); + + let metric = ctx + .get_metric::( + "ingester_write_buffer_last_ingest_ts", + &[("kafka_topic", TEST_TOPIC_NAME), ("kafka_partition", "0")], + ) + .fetch(); + let now = SystemProvider::new().now(); + assert!(metric < now.timestamp_nanos() as _); +} From 40f1937e63c11d04442a55621d2d716b8adae3d0 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 18 Oct 2022 22:13:13 +0200 Subject: [PATCH 4/5] test: write buffer seeking tests Asserts write buffer seeking behaviour, including: * Seeking past already persisted data correctly * Skipping to next available op in non-contiguous offset stream * Skipping to next available op for dropped ops due to retention * Panics when seeking beyond available data (into the future) Removes a pair of tests that covered some of the above due to their tight coupling with ingester internals. --- ingester/src/handler.rs | 115 --------------- ingester/tests/write.rs | 304 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 303 insertions(+), 116 deletions(-) diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 27d378c09d..d5c2def7e9 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -445,7 +445,6 @@ mod tests { use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; use super::*; - use crate::data::{partition::SnapshotBatch, table::TableName}; #[tokio::test] async fn test_shutdown() { @@ -583,88 +582,6 @@ mod tests { (ingester, shard, namespace) } - async fn verify_ingester_buffer_has_data( - ingester: IngestHandlerImpl, - shard: Shard, - namespace: Namespace, - custom_batch_verification: impl Fn(&SnapshotBatch) + Send, - ) { - // give the writes some time to go through the buffer. Exit once we've verified there's - // data in there - tokio::time::timeout(Duration::from_secs(1), async move { - let ns_name = namespace.name.into(); - let table_name = TableName::from("cpu"); - loop { - let mut has_measurement = false; - - if let Some(data) = ingester.data.shard(shard.id) { - if let Some(data) = data.namespace(&ns_name) { - // verify there's data in the buffer - if let Some((b, _)) = data.snapshot(&table_name, &"1970-01-01".into()).await - { - if let Some(b) = b.first() { - custom_batch_verification(b); - - if b.data.num_rows() == 1 { - has_measurement = true; - } - } - } - } - } - - if has_measurement { - break; - } - - tokio::time::sleep(Duration::from_millis(200)).await; - } - }) - .await - .expect("timeout"); - } - - #[tokio::test] - async fn seeks_on_initialization() { - let ingest_ts1 = Time::from_timestamp_millis(42); - let ingest_ts2 = Time::from_timestamp_millis(1337); - let write_operations = vec![ - DmlWrite::new( - "foo", - lines_to_batches("cpu bar=2 20", 0).unwrap(), - Some("1970-01-01".into()), - DmlMeta::sequenced( - Sequence::new(ShardIndex::new(0), SequenceNumber::new(1)), - ingest_ts1, - None, - 150, - ), - ), - DmlWrite::new( - "foo", - lines_to_batches("cpu bar=2 30", 0).unwrap(), - Some("1970-01-01".into()), - DmlMeta::sequenced( - Sequence::new(ShardIndex::new(0), SequenceNumber::new(2)), - ingest_ts2, - None, - 150, - ), - ), - ]; - - let (ingester, shard, namespace) = ingester_test_setup(write_operations, 2, false).await; - - verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| { - if first_batch.min_sequence_number == SequenceNumber::new(1) { - panic!( - "initialization did a seek to the beginning rather than the min_unpersisted" - ); - } - }) - .await; - } - #[tokio::test] #[should_panic(expected = "JoinError::Panic")] async fn sequence_number_no_longer_exists() { @@ -741,38 +658,6 @@ mod tests { .unwrap(); } - #[tokio::test] - async fn skip_to_oldest_available() { - maybe_start_logging(); - - let ingest_ts1 = Time::from_timestamp_millis(42); - let write_operations = vec![DmlWrite::new( - "foo", - lines_to_batches("cpu bar=2 20", 0).unwrap(), - Some("1970-01-01".into()), - DmlMeta::sequenced( - Sequence::new(ShardIndex::new(0), SequenceNumber::new(10)), - ingest_ts1, - None, - 150, - ), - )]; - - // Set the min unpersisted to something bigger than the write's sequence number to - // cause an UnknownSequenceNumber error. Skip to oldest available = true, so ingester - // should find data - let (ingester, shard, namespace) = ingester_test_setup(write_operations, 1, true).await; - - verify_ingester_buffer_has_data(ingester, shard, namespace, |first_batch| { - assert_eq!( - first_batch.min_sequence_number, - SequenceNumber::new(10), - "re-initialization didn't seek to the beginning", - ); - }) - .await; - } - #[tokio::test] async fn limits_concurrent_queries() { let (mut ingester, _, _) = ingester_test_setup(vec![], 0, true).await; diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs index 243cb75ea2..cbb6dd2908 100644 --- a/ingester/tests/write.rs +++ b/ingester/tests/write.rs @@ -1,8 +1,9 @@ mod common; use arrow_util::assert_batches_sorted_eq; +use assert_matches::assert_matches; pub use common::*; -use data_types::PartitionKey; +use data_types::{Partition, PartitionKey, SequenceNumber}; use generated_types::ingester::IngesterQueryRequest; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, U64Counter, U64Gauge}; @@ -117,3 +118,304 @@ async fn test_write_query() { let now = SystemProvider::new().now(); assert!(metric < now.timestamp_nanos() as _); } + +// Ensure an ingester correctly seeks to the offset stored in the catalog at +// startup, skipping any empty offsets. +#[tokio::test] +async fn test_seek_on_init() { + let mut ctx = TestContext::new().await; + + // Place some writes into the write buffer. + + let partition_key = PartitionKey::from("1970-01-01"); + + ctx.ensure_namespace("test_namespace").await; + ctx.write_lp( + "test_namespace", + "bananas greatness=\"unbounded\" 10", + partition_key.clone(), + 0, + ) + .await; + + // A subsequent write with a non-contiguous sequence number to a different + // table. + // + // Resuming will be configured against an offset in the middle of the two + // ranges. + let w2 = ctx + .write_lp( + "test_namespace", + "bananas greatness=\"amazing\",platanos=42 20", + partition_key.clone(), + 7, + ) + .await; + + // Wait for the writes to be processed. + ctx.wait_for_readable(w2).await; + + // Assert the data in memory. + let data = ctx + .query(IngesterQueryRequest { + namespace: "test_namespace".to_string(), + table: "bananas".to_string(), + columns: vec![], + predicate: None, + }) + .await + .expect("query should succeed") + .into_record_batches() + .await; + + let expected = vec![ + "+-----------+----------+--------------------------------+", + "| greatness | platanos | time |", + "+-----------+----------+--------------------------------+", + "| amazing | 42 | 1970-01-01T00:00:00.000000020Z |", + "| unbounded | | 1970-01-01T00:00:00.000000010Z |", + "+-----------+----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); + + // Update the catalog state, causing the next boot of the ingester to seek + // past the first write, but before the second write. + ctx.catalog() + .repositories() + .await + .shards() + .update_min_unpersisted_sequence_number(ctx.shard_id(), SequenceNumber::new(3)) + .await + .expect("failed to update persisted marker"); + + // Restart the ingester. + ctx.restart().await; + + // Wait for the second write to become readable again. + ctx.wait_for_readable(w2).await; + + // Assert the data in memory now contains only w2. + let data = ctx + .query(IngesterQueryRequest { + namespace: "test_namespace".to_string(), + table: "bananas".to_string(), + columns: vec![], + predicate: None, + }) + .await + .expect("query should succeed") + .into_record_batches() + .await; + + let expected = vec![ + "+-----------+----------+--------------------------------+", + "| greatness | platanos | time |", + "+-----------+----------+--------------------------------+", + "| amazing | 42 | 1970-01-01T00:00:00.000000020Z |", + "+-----------+----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); +} + +// Ensure an ingester respects the per-partition persist watermark, skipping +// already applied ops. +#[tokio::test] +async fn test_skip_previously_applied_partition_ops() { + let mut ctx = TestContext::new().await; + + // Place some writes into the write buffer. + let ns = ctx.ensure_namespace("test_namespace").await; + let partition_key = PartitionKey::from("1970-01-01"); + ctx.write_lp( + "test_namespace", + "bananas greatness=\"unbounded\" 10", + partition_key.clone(), + 5, + ) + .await; + let w2 = ctx + .write_lp( + "test_namespace", + "bananas greatness=\"amazing\",platanos=42 20", + partition_key.clone(), + 10, + ) + .await; + + // Wait for the writes to be processed. + ctx.wait_for_readable(w2).await; + + // Assert the data in memory. + let data = ctx + .query(IngesterQueryRequest { + namespace: "test_namespace".to_string(), + table: "bananas".to_string(), + columns: vec![], + predicate: None, + }) + .await + .expect("query should succeed") + .into_record_batches() + .await; + + let expected = vec![ + "+-----------+----------+--------------------------------+", + "| greatness | platanos | time |", + "+-----------+----------+--------------------------------+", + "| amazing | 42 | 1970-01-01T00:00:00.000000020Z |", + "| unbounded | | 1970-01-01T00:00:00.000000010Z |", + "+-----------+----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); + + // Read the partition ID of the writes above. + let partitions = ctx + .catalog() + .repositories() + .await + .partitions() + .list_by_namespace(ns.id) + .await + .unwrap(); + assert_matches!(&*partitions, &[Partition { .. }]); + + // And set the per-partition persist marker after the first write, but + // before the second. + ctx.catalog() + .repositories() + .await + .partitions() + .update_persisted_sequence_number(partitions[0].id, SequenceNumber::new(6)) + .await + .expect("failed to update persisted marker"); + + // Restart the ingester, which shall seek to the shard offset of 0, and + // begin replaying ops. + ctx.restart().await; + + // Wait for the second write to become readable again. + ctx.wait_for_readable(w2).await; + + // Assert the partition replay skipped the first write. + let data = ctx + .query(IngesterQueryRequest { + namespace: "test_namespace".to_string(), + table: "bananas".to_string(), + columns: vec![], + predicate: None, + }) + .await + .expect("query should succeed") + .into_record_batches() + .await; + + let expected = vec![ + "+-----------+----------+--------------------------------+", + "| greatness | platanos | time |", + "+-----------+----------+--------------------------------+", + "| amazing | 42 | 1970-01-01T00:00:00.000000020Z |", + "+-----------+----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); +} + +// Ensure a seek beyond the actual data available (i.e. into the future) causes +// a panic to bring about a human response. +#[tokio::test] +#[should_panic = "attempted to seek to offset 42, but current high watermark for partition 0 is 0"] +async fn test_seek_beyond_available_data() { + let mut ctx = TestContext::new().await; + + // Place a write into the write buffer so it is not empty. + ctx.ensure_namespace("test_namespace").await; + ctx.write_lp( + "test_namespace", + "bananas greatness=\"unbounded\" 10", + PartitionKey::from("1970-01-01"), + 0, + ) + .await; + + // Update the catalog state, causing the next boot of the ingester to seek + // past the write, beyond valid data offsets. + ctx.catalog() + .repositories() + .await + .shards() + .update_min_unpersisted_sequence_number(ctx.shard_id(), SequenceNumber::new(42)) + .await + .expect("failed to update persisted marker"); + + // Restart the ingester. + ctx.restart().await; +} + +// Ensure an ingester configured to resume from offset 1 correctly seeks to the +// oldest available data when that offset no longer exists. +#[tokio::test] +async fn test_seek_dropped_offset() { + let mut ctx = TestContext::new().await; + + // Place a write into the write buffer so it is not empty. + ctx.ensure_namespace("test_namespace").await; + + // A write at offset 42 + let w1 = ctx + .write_lp( + "test_namespace", + "bananas greatness=\"unbounded\" 10", + PartitionKey::from("1970-01-01"), + 42, + ) + .await; + + // Configure the ingester to seek to offset 1, which does not exist. + ctx.catalog() + .repositories() + .await + .shards() + .update_min_unpersisted_sequence_number(ctx.shard_id(), SequenceNumber::new(1)) + .await + .expect("failed to update persisted marker"); + + // Restart the ingester. + ctx.restart().await; + + // Wait for the op to be applied + ctx.wait_for_readable(w1).await; + + // Assert the data in memory now contains only w2. + let data = ctx + .query(IngesterQueryRequest { + namespace: "test_namespace".to_string(), + table: "bananas".to_string(), + columns: vec![], + predicate: None, + }) + .await + .expect("query should succeed") + .into_record_batches() + .await; + + let expected = vec![ + "+-----------+--------------------------------+", + "| greatness | time |", + "+-----------+--------------------------------+", + "| unbounded | 1970-01-01T00:00:00.000000010Z |", + "+-----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); + + // Ensure the metric was set to cause an alert for potential data loss. + let metric = ctx + .get_metric::( + "shard_reset_count", + &[ + ("kafka_topic", TEST_TOPIC_NAME), + ("kafka_partition", "0"), + ("potential_data_loss", "true"), + ], + ) + .fetch(); + assert!(metric > 0); +} From 0c0a38c4844064057faee77b764a8afaedc90d23 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 19 Oct 2022 11:47:03 +0200 Subject: [PATCH 5/5] refactor: more verbose shard reset logs Adds a little more context to the "shard reset" logs. --- ingester/src/stream_handler/handler.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 1b674fd2ea..3f32b20bcd 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -279,7 +279,8 @@ where shard_index=%self.shard_index, shard_id=%self.shard_id, potential_data_loss=true, - "reset stream" + "unable to read from desired sequence number offset \ + - reset stream to oldest available data" ); self.shard_reset_count.inc(1); sequence_number_before_reset = Some(self.current_sequence_number); @@ -293,7 +294,8 @@ where shard_index=%self.shard_index, shard_id=%self.shard_id, potential_data_loss=true, - "unable to read from desired sequence number offset" + "unable to read from desired sequence number offset \ + - aborting ingest due to configuration" ); self.shard_unknown_sequence_number_count.inc(1); None