diff --git a/.circleci/config.yml b/.circleci/config.yml index 00381812eb..d9bb1587b5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -131,6 +131,7 @@ jobs: doc: docker: - image: quay.io/influxdb/rust:ci + resource_class: medium+ # use of a smaller executor runs out of memory environment: # Disable incremental compilation to avoid overhead. We are not preserving these files anyway. CARGO_INCREMENTAL: "0" diff --git a/Cargo.lock b/Cargo.lock index 56ecf9a0c8..7ae4ed09d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2008,9 +2008,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", "http", @@ -4418,9 +4418,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.5" +version = "1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" +checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1" dependencies = [ "aho-corasick", "memchr", @@ -4438,9 +4438,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.25" +version = "0.6.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" +checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" [[package]] name = "remove_dir_all" diff --git a/cache_system/src/backend/mod.rs b/cache_system/src/backend/mod.rs index 92eb89543f..612fa83329 100644 --- a/cache_system/src/backend/mod.rs +++ b/cache_system/src/backend/mod.rs @@ -5,6 +5,7 @@ pub mod addressable_heap; pub mod hash_map; pub mod lru; pub mod resource_consumption; +pub mod shared; pub mod ttl; #[cfg(test)] diff --git a/cache_system/src/backend/shared.rs b/cache_system/src/backend/shared.rs new file mode 100644 index 0000000000..8efbb7d14d --- /dev/null +++ b/cache_system/src/backend/shared.rs @@ -0,0 +1,156 @@ +//! Backend that supports custom removal / expiry of keys +use parking_lot::Mutex; +use std::{any::Any, fmt::Debug, hash::Hash, sync::Arc}; + +use super::CacheBackend; + +/// Cache backend that allows another backend to be shared by managing +/// a mutex internally. +/// +/// This allows explicitly removing entries from the cache, for +/// example, based on a policy. +#[derive(Debug, Clone)] +pub struct SharedBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, +{ + inner_backend: Arc>>>, +} + +impl SharedBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, +{ + /// Create new backend around the inner backend + pub fn new(inner_backend: Box>) -> Self { + Self { + inner_backend: Arc::new(Mutex::new(inner_backend)), + } + } + + /// "remove" a key (aka remove it from the shared backend) if the + /// specified predicate is true. If the key is removed return + /// true, otherwise return false + /// + /// Note that the predicate function is called while the lock is + /// held (and thus the inner backend can't be concurrently accessed + pub fn remove_if

(&self, k: &K, predicate: P) -> bool + where + P: Fn(V) -> bool, + { + let mut inner_backend = self.inner_backend.lock(); + if let Some(v) = inner_backend.get(k) { + if predicate(v) { + inner_backend.remove(k); + return true; + } + } + false + } +} + +impl CacheBackend for SharedBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, +{ + type K = K; + type V = V; + + fn get(&mut self, k: &Self::K) -> Option { + self.inner_backend.lock().get(k) + } + + fn set(&mut self, k: Self::K, v: Self::V) { + self.inner_backend.lock().set(k, v); + } + + fn remove(&mut self, k: &Self::K) { + self.inner_backend.lock().remove(k) + } + + fn is_empty(&self) -> bool { + self.inner_backend.lock().is_empty() + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + + #[test] + fn test_generic() { + crate::backend::test_util::test_generic(|| SharedBackend::new(test_backend())) + } + + #[test] + fn test_is_shared() { + let mut backend1 = SharedBackend::new(test_backend()); + let mut backend2 = backend1.clone(); + + // test that a shared backend is really shared + backend1.set(1, "foo".into()); + backend2.set(2, "bar".into()); + + assert_eq!(backend1.get(&1), Some("foo".into())); + assert_eq!(backend2.get(&1), Some("foo".into())); + assert_eq!(backend1.get(&2), Some("bar".into())); + assert_eq!(backend2.get(&2), Some("bar".into())); + + // make a third backend and it should also modify the previous ones + let mut backend3 = backend1.clone(); + assert_eq!(backend3.get(&1), Some("foo".into())); + assert_eq!(backend3.get(&2), Some("bar".into())); + + // update key 2 + backend3.set(2, "baz".into()); + assert_eq!(backend1.get(&2), Some("baz".into())); + assert_eq!(backend2.get(&2), Some("baz".into())); + assert_eq!(backend3.get(&2), Some("baz".into())); + } + + #[test] + fn test_remove_if() { + let mut backend = SharedBackend::new(test_backend()); + backend.set(1, "foo".into()); + backend.set(2, "bar".into()); + + backend.remove_if(&1, |v| v == "zzz"); + assert_eq!(backend.get(&1), Some("foo".into())); + assert_eq!(backend.get(&2), Some("bar".into())); + + backend.remove_if(&1, |v| v == "foo"); + assert_eq!(backend.get(&1), None); + assert_eq!(backend.get(&2), Some("bar".into())); + + backend.remove_if(&1, |v| v == "bar"); + assert_eq!(backend.get(&1), None); + assert_eq!(backend.get(&2), Some("bar".into())); + } + + #[test] + fn test_remove_if_shared() { + let mut backend = SharedBackend::new(test_backend()); + backend.set(1, "foo".into()); + backend.set(2, "bar".into()); + + let backend2 = backend.clone(); + backend2.remove_if(&1, |v| v == "foo"); + + // original backend should reflect the changes + assert_eq!(backend.get(&1), None); + assert_eq!(backend.get(&2), Some("bar".into())); + } + + fn test_backend() -> Box> { + Box::new(HashMap::new()) + } +} diff --git a/clap_blocks/src/ingester.rs b/clap_blocks/src/ingester.rs index a3f06801c7..40df44e6c1 100644 --- a/clap_blocks/src/ingester.rs +++ b/clap_blocks/src/ingester.rs @@ -15,19 +15,18 @@ pub struct IngesterConfig { )] pub write_buffer_partition_range_end: i32, - /// The ingester will continue to pull data and buffer it from Kafka - /// as long as it is below this size. If it hits this size it will pause - /// ingest from Kafka until persistence goes below this threshold. + /// The ingester will continue to pull data and buffer it from the write buffer as long as the + /// ingester buffer is below this size. If the ingester buffer hits this size, ingest from the + /// write buffer will pause until the ingester buffer goes below this threshold. #[clap( long = "--pause-ingest-size-bytes", env = "INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES" )] pub pause_ingest_size_bytes: usize, - /// Once the ingester crosses this threshold of data buffered across - /// all sequencers, it will pick the largest partitions and persist - /// them until it falls below this threshold. An ingester running in - /// a steady state is expected to take up this much memory. + /// Once the ingester crosses this threshold of data buffered across all sequencers, it will + /// pick the largest partitions and persist them until it falls below this threshold. An + /// ingester running in a steady state is expected to take up this much memory. #[clap( long = "--persist-memory-threshold-bytes", env = "INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES" @@ -43,10 +42,9 @@ pub struct IngesterConfig { )] pub persist_partition_size_threshold_bytes: usize, - /// If a partition has had data buffered for longer than this period of time - /// it will be persisted. This puts an upper bound on how far back the - /// ingester may need to read in Kafka on restart or recovery. The default value - /// is 30 minutes (in seconds). + /// If a partition has had data buffered for longer than this period of time, it will be + /// persisted. This puts an upper bound on how far back the ingester may need to read from the + /// write buffer on restart or recovery. The default value is 30 minutes (in seconds). #[clap( long = "--persist-partition-age-threshold-seconds", env = "INFLUXDB_IOX_PERSIST_PARTITION_AGE_THRESHOLD_SECONDS", @@ -62,4 +60,14 @@ pub struct IngesterConfig { default_value = "300" )] pub persist_partition_cold_threshold_seconds: u64, + + /// If the catalog's max sequence number for the partition is no longer available in the write + /// buffer due to the retention policy, by default the ingester will panic. If this flag is + /// specified, the ingester will skip any sequence numbers that have not been retained in the + /// write buffer and will start up successfully with the oldest available data. + #[clap( + long = "--skip-to-oldest-available", + env = "INFLUXDB_IOX_SKIP_TO_OLDEST_AVAILABLE" + )] + pub skip_to_oldest_available: bool, } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 372b247e2b..32be1c0d53 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -1852,8 +1852,8 @@ mod tests { // They should be 2 groups assert_eq!(groups.len(), 2, "There should have been two group"); - groups[0].parquet_files.contains(&pf1); - groups[1].parquet_files.contains(&pf2); + assert!(groups[0].parquet_files.contains(&pf1)); + assert!(groups[1].parquet_files.contains(&pf2)); } #[test] diff --git a/influxdb2_client/src/lib.rs b/influxdb2_client/src/lib.rs index 6e36bb3fe5..3f53c11459 100644 --- a/influxdb2_client/src/lib.rs +++ b/influxdb2_client/src/lib.rs @@ -1,10 +1,11 @@ #![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)] +// `clippy::use_self` is deliberately excluded from the lints this crate uses. +// See . #![warn( missing_copy_implementations, missing_debug_implementations, missing_docs, clippy::explicit_iter_loop, - clippy::use_self, clippy::clone_on_ref_ptr, clippy::future_not_send )] diff --git a/influxdb2_client/tests/common/server_fixture.rs b/influxdb2_client/tests/common/server_fixture.rs index 2d6dde839f..fc0eab81d7 100644 --- a/influxdb2_client/tests/common/server_fixture.rs +++ b/influxdb2_client/tests/common/server_fixture.rs @@ -71,10 +71,13 @@ impl ServerFixture { let shared_server = SHARED_SERVER.get_or_init(|| parking_lot::Mutex::new(Weak::new())); - let mut shared_server = shared_server.lock(); + let shared_upgraded = { + let locked = shared_server.lock(); + locked.upgrade() + }; // is a shared server already present? - let server = match shared_server.upgrade() { + let server = match shared_upgraded { Some(server) => server, None => { // if not, create one @@ -86,11 +89,11 @@ impl ServerFixture { // save a reference for other threads that may want to // use this server, but don't prevent it from being // destroyed when going out of scope + let mut shared_server = shared_server.lock(); *shared_server = Arc::downgrade(&server); server } }; - std::mem::drop(shared_server); Self { server } } diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 0d1dd656a5..58da1684c8 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -334,6 +334,10 @@ impl Config { let write_buffer_partition_range_start = 0; let write_buffer_partition_range_end = 0; + // Use whatever data is available in the write buffer rather than erroring if the sequence + // number has not been retained in the write buffer. + let skip_to_oldest_available = true; + let ingester_config = IngesterConfig { write_buffer_partition_range_start, write_buffer_partition_range_end, @@ -342,6 +346,7 @@ impl Config { persist_partition_size_threshold_bytes, persist_partition_age_threshold_seconds, persist_partition_cold_threshold_seconds, + skip_to_oldest_available, }; // create a CompactorConfig for the all in one server based on diff --git a/ingester/src/data.rs b/ingester/src/data.rs index ce48216c9c..1cf6360908 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -2206,8 +2206,8 @@ mod tests { .await .unwrap(); { - let tables = data.tables.read(); - let table = tables.get("mem").unwrap().read().await; + let table_data = data.table_data("mem").unwrap(); + let table = table_data.read().await; let p = table.partition_data.get("1970-01-01").unwrap(); assert_eq!( p.data.max_persisted_sequence_number, @@ -2229,8 +2229,8 @@ mod tests { .await .unwrap(); - let tables = data.tables.read(); - let table = tables.get("mem").unwrap().read().await; + let table_data = data.table_data("mem").unwrap(); + let table = table_data.read().await; let partition = table.partition_data.get("1970-01-01").unwrap(); assert_eq!( partition.data.buffer.as_ref().unwrap().min_sequence_number, diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 5bffd21b28..e1e36cdb30 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -13,7 +13,7 @@ use crate::{ }; use async_trait::async_trait; use backoff::BackoffConfig; -use data_types::{KafkaPartition, KafkaTopic, Sequencer}; +use data_types::{KafkaPartition, KafkaTopic, SequenceNumber, Sequencer}; use futures::{ future::{BoxFuture, Shared}, stream::FuturesUnordered, @@ -123,6 +123,7 @@ impl IngestHandlerImpl { write_buffer: Arc, exec: Arc, metric_registry: Arc, + skip_to_oldest_available: bool, ) -> Result { // build the initial ingester data state let mut sequencers = BTreeMap::new(); @@ -216,12 +217,14 @@ impl IngestHandlerImpl { let kafka_topic_name = kafka_topic_name.clone(); async move { let handler = SequencedStreamHandler::new( - op_stream.stream().await, + op_stream, + SequenceNumber::new(sequencer.min_unpersisted_sequence_number), sink, lifecycle_handle, kafka_topic_name, sequencer.kafka_partition, &*metric_registry, + skip_to_oldest_available, ); handler.run(shutdown).await @@ -350,6 +353,7 @@ impl Drop for IngestHandlerImpl { #[cfg(test)] mod tests { use super::*; + use crate::data::SnapshotBatch; use data_types::{Namespace, NamespaceSchema, QueryPool, Sequence, SequenceNumber}; use dml::{DmlMeta, DmlWrite}; use iox_catalog::{mem::MemCatalog, validate_or_insert_schema}; @@ -557,8 +561,11 @@ mod tests { .unwrap(); } - #[tokio::test] - async fn seeks_on_initialization() { + async fn ingester_test_setup( + write_operations: Vec, + min_unpersisted_sequence_number: i64, + skip_to_oldest_available: bool, + ) -> (IngestHandlerImpl, Sequencer, Namespace) { let metrics: Arc = Default::default(); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); @@ -576,11 +583,14 @@ mod tests { .create_or_get(&kafka_topic, kafka_partition) .await .unwrap(); - // update the min unpersisted so we can verify this was what was seeked to later - sequencer.min_unpersisted_sequence_number = 2; + // update the min unpersisted + sequencer.min_unpersisted_sequence_number = min_unpersisted_sequence_number; // this probably isn't necessary, but just in case something changes later txn.sequencers() - .update_min_unpersisted_sequence_number(sequencer.id, SequenceNumber::new(2)) + .update_min_unpersisted_sequence_number( + sequencer.id, + SequenceNumber::new(min_unpersisted_sequence_number), + ) .await .unwrap(); @@ -591,25 +601,14 @@ mod tests { MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap()); let schema = NamespaceSchema::new(namespace.id, kafka_topic.id, query_pool.id); - let ingest_ts1 = Time::from_timestamp_millis(42); - let ingest_ts2 = Time::from_timestamp_millis(1337); - let w1 = DmlWrite::new( - "foo", - lines_to_batches("cpu bar=2 20", 0).unwrap(), - DmlMeta::sequenced(Sequence::new(0, 1), ingest_ts1, None, 150), - ); - let _schema = validate_or_insert_schema(w1.tables(), &schema, txn.deref_mut()) - .await - .unwrap() - .unwrap(); - let w2 = DmlWrite::new( - "foo", - lines_to_batches("cpu bar=2 30", 0).unwrap(), - DmlMeta::sequenced(Sequence::new(0, 2), ingest_ts2, None, 150), - ); + for write_operation in write_operations { + validate_or_insert_schema(write_operation.tables(), &schema, txn.deref_mut()) + .await + .unwrap() + .unwrap(); + write_buffer_state.push_write(write_operation); + } txn.commit().await.unwrap(); - write_buffer_state.push_write(w1); - write_buffer_state.push_write(w2); let reading: Arc = Arc::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()); @@ -631,27 +630,32 @@ mod tests { reading, Arc::new(Executor::new(1)), Arc::clone(&metrics), + skip_to_oldest_available, ) .await .unwrap(); + (ingester, sequencer, namespace) + } + + async fn verify_ingester_buffer_has_data( + ingester: IngestHandlerImpl, + sequencer: Sequencer, + namespace: Namespace, + custom_batch_verification: impl Fn(&SnapshotBatch) + Send, + ) { // give the writes some time to go through the buffer. Exit once we've verified there's // data in there - tokio::time::timeout(Duration::from_secs(1), async { + tokio::time::timeout(Duration::from_secs(1), async move { loop { let mut has_measurement = false; - if let Some(data) = ingester - .data - .sequencer(sequencer.id) - { + if let Some(data) = ingester.data.sequencer(sequencer.id) { if let Some(data) = data.namespace(&namespace.name) { // verify there's data in the buffer if let Some((b, _)) = data.snapshot("cpu", "1970-01-01").await { if let Some(b) = b.first() { - if b.min_sequencer_number == SequenceNumber::new(1) { - panic!("initialization did a seek to the beginning rather than the min_unpersisted"); - } + custom_batch_verification(b); if b.data.num_rows() == 1 { has_measurement = true; @@ -668,8 +672,76 @@ mod tests { tokio::time::sleep(Duration::from_millis(200)).await; } }) + .await + .expect("timeout"); + } + + #[tokio::test] + async fn seeks_on_initialization() { + let ingest_ts1 = Time::from_timestamp_millis(42); + let ingest_ts2 = Time::from_timestamp_millis(1337); + let write_operations = vec![ + DmlWrite::new( + "foo", + lines_to_batches("cpu bar=2 20", 0).unwrap(), + DmlMeta::sequenced(Sequence::new(0, 1), ingest_ts1, None, 150), + ), + DmlWrite::new( + "foo", + lines_to_batches("cpu bar=2 30", 0).unwrap(), + DmlMeta::sequenced(Sequence::new(0, 2), ingest_ts2, None, 150), + ), + ]; + + let (ingester, sequencer, namespace) = + ingester_test_setup(write_operations, 2, false).await; + + verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| { + if first_batch.min_sequencer_number == SequenceNumber::new(1) { + panic!( + "initialization did a seek to the beginning rather than \ + the min_unpersisted" + ); + } + }) + .await; + } + + #[tokio::test] + #[should_panic(expected = "JoinError::Panic")] + async fn seeks_on_initialization_unknown_sequence_number() { + // No write operations means the stream will return unknown sequence number + // Ingester will panic because skip_to_oldest_available is false + let (ingester, _sequencer, _namespace) = ingester_test_setup(vec![], 2, false).await; + + tokio::time::timeout(Duration::from_millis(1000), ingester.join()) .await - .expect("timeout"); + .unwrap(); + } + + #[tokio::test] + async fn seeks_on_initialization_unknown_sequence_number_skip_to_oldest_available() { + let ingest_ts1 = Time::from_timestamp_millis(42); + let write_operations = vec![DmlWrite::new( + "foo", + lines_to_batches("cpu bar=2 20", 0).unwrap(), + DmlMeta::sequenced(Sequence::new(0, 1), ingest_ts1, None, 150), + )]; + + // Set the min unpersisted to something bigger than the write's sequence number to + // cause an UnknownSequenceNumber error. Skip to oldest available = true, so ingester + // should find data + let (ingester, sequencer, namespace) = + ingester_test_setup(write_operations, 10, true).await; + + verify_ingester_buffer_has_data(ingester, sequencer, namespace, |first_batch| { + assert_eq!( + first_batch.min_sequencer_number, + SequenceNumber::new(1), + "re-initialization didn't seek to the beginning", + ); + }) + .await; } struct TestIngester { @@ -730,6 +802,7 @@ mod tests { reading, Arc::new(Executor::new(1)), Arc::clone(&metrics), + false, ) .await .unwrap(); diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 175d5291d8..71a2543a7c 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -1,14 +1,14 @@ use super::DmlSink; use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl}; -use data_types::KafkaPartition; +use data_types::{KafkaPartition, SequenceNumber}; use dml::DmlOperation; -use futures::{pin_mut, FutureExt, Stream, StreamExt}; +use futures::{pin_mut, FutureExt, StreamExt}; use iox_time::{SystemProvider, TimeProvider}; use metric::{Attributes, U64Counter, U64Gauge}; use observability_deps::tracing::*; use std::{fmt::Debug, time::Duration}; use tokio_util::sync::CancellationToken; -use write_buffer::core::{WriteBufferError, WriteBufferErrorKind}; +use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler}; /// When the [`LifecycleManager`] indicates that ingest should be paused because /// of memory pressure, the sequencer will loop, sleeping this long between @@ -31,8 +31,11 @@ const INGEST_POLL_INTERVAL: Duration = Duration::from_millis(100); /// [`LifecycleHandle::can_resume_ingest()`]: crate::lifecycle::LifecycleHandle::can_resume_ingest() #[derive(Debug)] pub struct SequencedStreamHandler { - /// A input stream of DML ops - stream: I, + /// Creator/manager of the stream of DML ops + write_buffer_stream_handler: I, + + current_sequence_number: SequenceNumber, + /// An output sink that processes DML operations and applies them to /// in-memory state. sink: O, @@ -46,17 +49,22 @@ pub struct SequencedStreamHandler { // Metrics time_provider: T, time_to_be_readable_ms: U64Gauge, + /// Duration of time ingest is paused at the request of the LifecycleManager pause_duration_ms: U64Counter, + /// Errors during op stream reading seq_unknown_sequence_number_count: U64Counter, seq_invalid_data_count: U64Counter, seq_unknown_error_count: U64Counter, sink_apply_error_count: U64Counter, + skipped_sequence_number_amount: U64Counter, /// Log context fields - otherwise unused. kafka_topic_name: String, kafka_partition: KafkaPartition, + + skip_to_oldest_available: bool, } impl SequencedStreamHandler { @@ -66,18 +74,22 @@ impl SequencedStreamHandler { /// A [`SequencedStreamHandler`] starts actively consuming items from /// `stream` once [`SequencedStreamHandler::run()`] is called, and /// gracefully stops when `shutdown` is cancelled. + #[allow(clippy::too_many_arguments)] pub fn new( - stream: I, + write_buffer_stream_handler: I, + current_sequence_number: SequenceNumber, sink: O, lifecycle_handle: LifecycleHandleImpl, kafka_topic_name: String, kafka_partition: KafkaPartition, metrics: &metric::Registry, + skip_to_oldest_available: bool, ) -> Self { // TTBR let time_to_be_readable_ms = metrics.register_metric::( "ingester_ttbr_ms", - "duration of time between producer writing to consumer putting into queryable cache in milliseconds", + "duration of time between producer writing to consumer putting into queryable cache in \ + milliseconds", ).recorder(metric_attrs(kafka_partition, &kafka_topic_name, None, false)); // Lifecycle-driven ingest pause duration @@ -115,9 +127,16 @@ impl SequencedStreamHandler { Some("sink_apply_error"), true, )); + let skipped_sequence_number_amount = ingest_errors.recorder(metric_attrs( + kafka_partition, + &kafka_topic_name, + Some("skipped_sequence_number_amount"), + true, + )); Self { - stream, + write_buffer_stream_handler, + current_sequence_number, sink, lifecycle_handle, time_provider: SystemProvider::default(), @@ -127,8 +146,10 @@ impl SequencedStreamHandler { seq_invalid_data_count, seq_unknown_error_count, sink_apply_error_count, + skipped_sequence_number_amount, kafka_topic_name, kafka_partition, + skip_to_oldest_available, } } @@ -136,7 +157,8 @@ impl SequencedStreamHandler { #[cfg(test)] pub(crate) fn with_time_provider(self, provider: T) -> SequencedStreamHandler { SequencedStreamHandler { - stream: self.stream, + write_buffer_stream_handler: self.write_buffer_stream_handler, + current_sequence_number: self.current_sequence_number, sink: self.sink, lifecycle_handle: self.lifecycle_handle, time_provider: provider, @@ -146,38 +168,42 @@ impl SequencedStreamHandler { seq_invalid_data_count: self.seq_invalid_data_count, seq_unknown_error_count: self.seq_unknown_error_count, sink_apply_error_count: self.sink_apply_error_count, + skipped_sequence_number_amount: self.skipped_sequence_number_amount, kafka_topic_name: self.kafka_topic_name, kafka_partition: self.kafka_partition, + skip_to_oldest_available: self.skip_to_oldest_available, } } } impl SequencedStreamHandler where - I: Stream> + Unpin + Send, + I: WriteBufferStreamHandler, O: DmlSink, T: TimeProvider, { - /// Run the stream handler, consuming items from [`Stream`] and applying - /// them to the [`DmlSink`]. + /// Run the stream handler, consuming items from the stream provided by the + /// [`WriteBufferStreamHandler`] and applying them to the [`DmlSink`]. /// /// This method blocks until gracefully shutdown by cancelling the /// `shutdown` [`CancellationToken`]. Once cancelled, this handler will /// complete the current operation it is processing before this method /// returns. /// - /// # Panics + /// # Panics /// /// This method panics if the input stream ends (yields a `None`). pub async fn run(mut self, shutdown: CancellationToken) { let shutdown_fut = shutdown.cancelled().fuse(); pin_mut!(shutdown_fut); + let mut stream = self.write_buffer_stream_handler.stream().await; + let mut sequence_number_before_reset: Option = None; + loop { - // Wait for a DML operation from the sequencer, or a graceful stop - // signal. + // Wait for a DML operation from the sequencer, or a graceful stop signal. let maybe_op = futures::select!( - next = self.stream.next().fuse() => next, + next = stream.next().fuse() => next, _ = shutdown_fut => { info!( kafka_topic=%self.kafka_topic_name, @@ -196,17 +222,44 @@ where // DmlSink, return None rather than continuing the loop to ensure // ingest pauses are respected. let maybe_op = match maybe_op { - Some(Ok(op)) => Some(op), + Some(Ok(op)) => { + if let Some(sequence_number) = op.meta().sequence().map(|s| s.sequence_number) { + let sequence_number = SequenceNumber::new(sequence_number as i64); + if let Some(before_reset) = sequence_number_before_reset { + // We've requested the stream to be reset and we've skipped this many + // sequence numbers. Store in a metric once. + if before_reset != sequence_number { + let difference = sequence_number.get() - before_reset.get(); + self.skipped_sequence_number_amount.inc(difference as u64); + } + sequence_number_before_reset = None; + } + self.current_sequence_number = sequence_number; + } + + Some(op) + } Some(Err(e)) if e.kind() == WriteBufferErrorKind::UnknownSequenceNumber => { - error!( - error=%e, - kafka_topic=%self.kafka_topic_name, - kafka_partition=%self.kafka_partition, - potential_data_loss=true, - "unable to read from desired sequencer offset" - ); - self.seq_unknown_sequence_number_count.inc(1); - None + // If we get an unknown sequence number, and we're fine potentially having + // missed writes that were too old to be retained, try resetting the stream + // once and getting the next operation again. + // Keep the current sequence number to compare with the sequence number + if self.skip_to_oldest_available && sequence_number_before_reset.is_none() { + sequence_number_before_reset = Some(self.current_sequence_number); + self.write_buffer_stream_handler.reset_to_earliest(); + stream = self.write_buffer_stream_handler.stream().await; + continue; + } else { + error!( + error=%e, + kafka_topic=%self.kafka_topic_name, + kafka_partition=%self.kafka_partition, + potential_data_loss=true, + "unable to read from desired sequencer offset" + ); + self.seq_unknown_sequence_number_count.inc(1); + None + } } Some(Err(e)) if e.kind() == WriteBufferErrorKind::IO => { warn!( @@ -378,23 +431,24 @@ fn metric_attrs( #[cfg(test)] mod tests { - use std::sync::Arc; - use super::*; use crate::{ lifecycle::{LifecycleConfig, LifecycleManager}, stream_handler::mock_sink::MockDmlSink, }; use assert_matches::assert_matches; + use async_trait::async_trait; use data_types::{DeletePredicate, Sequence, TimestampRange}; use dml::{DmlDelete, DmlMeta, DmlWrite}; - use futures::stream; + use futures::stream::{self, BoxStream}; use iox_time::{SystemProvider, Time}; use metric::Metric; use mutable_batch_lp::lines_to_batches; + use std::sync::Arc; use test_helpers::timeout::FutureTimeout; - use tokio::sync::mpsc; + use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; + use write_buffer::core::WriteBufferError; lazy_static::lazy_static! { static ref TEST_TIME: Time = SystemProvider::default().now(); @@ -433,6 +487,75 @@ mod tests { DmlDelete::new(name, pred, None, sequence) } + #[derive(Debug)] + struct TestWriteBufferStreamHandler { + stream_ops: Vec>>, + #[allow(clippy::type_complexity)] + completed_tx: + Option>, usize)>>, + } + + impl TestWriteBufferStreamHandler { + fn new( + stream_ops: Vec>>, + completed_tx: oneshot::Sender<( + mpsc::Sender>, + usize, + )>, + ) -> Self { + Self { + // reverse the order so we can pop off the end + stream_ops: stream_ops.into_iter().rev().collect(), + completed_tx: Some(completed_tx), + } + } + } + + #[async_trait] + impl WriteBufferStreamHandler for TestWriteBufferStreamHandler { + async fn stream(&mut self) -> BoxStream<'static, Result> { + let stream_ops = self.stream_ops.pop().unwrap(); + + // Create a channel to pass input to the handler, with a + // buffer capacity of the number of operations to send (used to tell if all + // values have been received in the test thread). + let capacity = if stream_ops.is_empty() { + 1 // channels can't have capacity 0, even if we're never sending anything + } else { + stream_ops.len() + }; + let (tx, rx) = mpsc::channel(capacity); + + // Push all inputs + for op in stream_ops { + tx.send(op) + .with_timeout_panic(Duration::from_secs(5)) + .await + .expect("early handler exit"); + } + + // If this is the last expected call to stream, + // Send the transmitter and the capacity back to the test thread to wait for completion. + if self.stream_ops.is_empty() { + self.completed_tx + .take() + .unwrap() + .send((tx, capacity)) + .unwrap(); + } + + ReceiverStream::new(rx).boxed() + } + + async fn seek(&mut self, _sequence_number: u64) -> Result<(), WriteBufferError> { + Ok(()) + } + + fn reset_to_earliest(&mut self) { + // Intentionally left blank + } + } + // Generates a test that ensures that the handler given $stream_ops makes // $want_sink calls. // @@ -441,20 +564,24 @@ mod tests { macro_rules! test_stream_handler { ( $name:ident, - stream_ops = $stream_ops:expr, // An ordered set of stream items to feed to the handler - sink_rets = $sink_ret:expr, // An ordered set of values to return from the mock op sink + // Whether to skip to the oldest available sequence number if UnknownSequenceNumber + skip_to_oldest_available = $skip_to_oldest_available:expr, + stream_ops = $stream_ops:expr, // Ordered set of stream items to feed to the handler + sink_rets = $sink_ret:expr, // Ordered set of values to return from the mock op sink want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds // Optional set of ingest error metric label / values to assert want_err_metrics = [$($metric_name:literal => $metric_count:literal),*], - want_sink = $($want_sink:tt)+ // A pattern to match against the calls made to the op sink + want_sink = $($want_sink:tt)+ // Pattern to match against calls made to the op sink ) => { paste::paste! { #[tokio::test] async fn []() { let metrics = Arc::new(metric::Registry::default()); - let time_provider: Arc< dyn TimeProvider> = Arc::new(SystemProvider::default()); + let time_provider: Arc = Arc::new(SystemProvider::default()); let lifecycle = LifecycleManager::new( - LifecycleConfig::new(100, 2, 3, Duration::from_secs(4), Duration::from_secs(5)), + LifecycleConfig::new( + 100, 2, 3, Duration::from_secs(4), Duration::from_secs(5) + ), Arc::clone(&metrics), time_provider, ); @@ -465,17 +592,21 @@ mod tests { .with_apply_return($sink_ret) ); - // Create an channel to pass input to the handler, with a - // buffer capacity of 1 (used below). - let (tx, rx) = mpsc::channel(1); + let (completed_tx, completed_rx) = oneshot::channel(); + let write_buffer_stream_handler = TestWriteBufferStreamHandler::new( + $stream_ops, + completed_tx + ); let handler = SequencedStreamHandler::new( - ReceiverStream::new(rx), + write_buffer_stream_handler, + SequenceNumber::new(0), Arc::clone(&sink), lifecycle.handle(), TEST_KAFKA_TOPIC.to_string(), *TEST_KAFKA_PARTITION, &*metrics, + $skip_to_oldest_available, ).with_time_provider(iox_time::MockProvider::new(*TEST_TIME)); // Run the handler in the background and push inputs to it @@ -485,24 +616,22 @@ mod tests { handler.run(handler_shutdown).await; }); - // Push the input one at a time, wait for the the last - // message to be consumed by the handler (channel capacity - // increases to 1 once the message is read) and then request - // a graceful shutdown. - for op in $stream_ops { - tx.send(op) - .with_timeout_panic(Duration::from_secs(5)) - .await - .expect("early handler exit"); - } - // Wait for the handler to read the last op, restoring the - // capacity to 1. - let _permit = tx.reserve() - .with_timeout_panic(Duration::from_secs(5)) - .await - .expect("early handler exit"); + // When all operations have been read through the TestWriteBufferStreamHandler, + let (tx, capacity) = completed_rx.await.unwrap(); - // Trigger graceful shutdown + // Wait for the handler to read the last op, + async { + loop { + tokio::time::sleep(Duration::from_millis(10)).await; + if tx.capacity() == capacity { + return; + } + } + }.with_timeout_panic(Duration::from_secs(5)) + .await; + + + // Then trigger graceful shutdown shutdown.cancel(); // And wait for the handler to stop. @@ -548,7 +677,8 @@ mod tests { test_stream_handler!( immediate_shutdown, - stream_ops = [], + skip_to_oldest_available = false, + stream_ops = vec![vec![]], sink_rets = [], want_ttbr = 0, // No ops, no TTBR want_err_metrics = [], @@ -558,8 +688,9 @@ mod tests { // Single write op applies OK, then shutdown. test_stream_handler!( write_ok, - stream_ops = [ - Ok(DmlOperation::Write(make_write("bananas", 42))) + skip_to_oldest_available = false, + stream_ops = vec![ + vec![Ok(DmlOperation::Write(make_write("bananas", 42)))] ], sink_rets = [Ok(true)], want_ttbr = 42, @@ -572,8 +703,9 @@ mod tests { // Single delete op applies OK, then shutdown. test_stream_handler!( delete_ok, - stream_ops = [ - Ok(DmlOperation::Delete(make_delete("platanos", 24))) + skip_to_oldest_available = false, + stream_ops = vec![ + vec![Ok(DmlOperation::Delete(make_delete("platanos", 24)))] ], sink_rets = [Ok(true)], want_ttbr = 24, @@ -587,10 +719,11 @@ mod tests { // affect the next op in the stream. test_stream_handler!( non_fatal_stream_io_error, - stream_ops = [ + skip_to_oldest_available = false, + stream_ops = vec![vec![ Err(WriteBufferError::new(WriteBufferErrorKind::IO, "explosions")), Ok(DmlOperation::Write(make_write("bananas", 13))) - ], + ]], sink_rets = [Ok(true)], want_ttbr = 13, want_err_metrics = [ @@ -598,7 +731,8 @@ mod tests { "sequencer_unknown_sequence_number" => 0, "sequencer_invalid_data" => 0, "sequencer_unknown_error" => 0, - "sink_apply_error" => 0 + "sink_apply_error" => 0, + "skipped_sequence_number_amount" => 0 ], want_sink = [DmlOperation::Write(op)] => { assert_eq!(op.namespace(), "bananas"); @@ -606,17 +740,46 @@ mod tests { ); test_stream_handler!( non_fatal_stream_offset_error, - stream_ops = [ + skip_to_oldest_available = false, + stream_ops = vec![vec![ Err(WriteBufferError::new(WriteBufferErrorKind::UnknownSequenceNumber, "explosions")), Ok(DmlOperation::Write(make_write("bananas", 31))) - ], + ]], sink_rets = [Ok(true)], want_ttbr = 31, want_err_metrics = [ "sequencer_unknown_sequence_number" => 1, "sequencer_invalid_data" => 0, "sequencer_unknown_error" => 0, - "sink_apply_error" => 0 + "sink_apply_error" => 0, + "skipped_sequence_number_amount" => 0 + ], + want_sink = [DmlOperation::Write(op)] => { + assert_eq!(op.namespace(), "bananas"); + } + ); + test_stream_handler!( + skip_to_oldest_on_unknown_sequence_number, + skip_to_oldest_available = true, + stream_ops = vec![ + vec![ + Err( + WriteBufferError::new( + WriteBufferErrorKind::UnknownSequenceNumber, + "explosions" + ) + ) + ], + vec![Ok(DmlOperation::Write(make_write("bananas", 31)))], + ], + sink_rets = [Ok(true)], + want_ttbr = 31, + want_err_metrics = [ + "sequencer_unknown_sequence_number" => 0, + "sequencer_invalid_data" => 0, + "sequencer_unknown_error" => 0, + "sink_apply_error" => 0, + "skipped_sequence_number_amount" => 2 ], want_sink = [DmlOperation::Write(op)] => { assert_eq!(op.namespace(), "bananas"); @@ -624,17 +787,19 @@ mod tests { ); test_stream_handler!( non_fatal_stream_invalid_data, - stream_ops = [ + skip_to_oldest_available = false, + stream_ops = vec![vec![ Err(WriteBufferError::new(WriteBufferErrorKind::InvalidData, "explosions")), Ok(DmlOperation::Write(make_write("bananas", 50))) - ], + ]], sink_rets = [Ok(true)], want_ttbr = 50, want_err_metrics = [ "sequencer_unknown_sequence_number" => 0, "sequencer_invalid_data" => 1, "sequencer_unknown_error" => 0, - "sink_apply_error" => 0 + "sink_apply_error" => 0, + "skipped_sequence_number_amount" => 0 ], want_sink = [DmlOperation::Write(op)] => { assert_eq!(op.namespace(), "bananas"); @@ -642,17 +807,19 @@ mod tests { ); test_stream_handler!( non_fatal_stream_unknown_error, - stream_ops = [ + skip_to_oldest_available = false, + stream_ops = vec![vec![ Err(WriteBufferError::new(WriteBufferErrorKind::Unknown, "explosions")), Ok(DmlOperation::Write(make_write("bananas", 60))) - ], + ]], sink_rets = [Ok(true)], want_ttbr = 60, want_err_metrics = [ "sequencer_unknown_sequence_number" => 0, "sequencer_invalid_data" => 0, "sequencer_unknown_error" => 1, - "sink_apply_error" => 0 + "sink_apply_error" => 0, + "skipped_sequence_number_amount" => 0 ], want_sink = [DmlOperation::Write(op)] => { assert_eq!(op.namespace(), "bananas"); @@ -662,10 +829,11 @@ mod tests { // Asserts the TTBR is not set unless an op is successfully sunk. test_stream_handler!( no_success_no_ttbr, - stream_ops = [Err(WriteBufferError::new( + skip_to_oldest_available = false, + stream_ops = vec![vec![Err(WriteBufferError::new( WriteBufferErrorKind::IO, "explosions" - )),], + ))]], sink_rets = [], want_ttbr = 0, want_err_metrics = [], @@ -675,12 +843,13 @@ mod tests { // Asserts the TTBR is uses the last value in the stream. test_stream_handler!( reports_last_ttbr, - stream_ops = [ + skip_to_oldest_available = false, + stream_ops = vec![vec![ Ok(DmlOperation::Write(make_write("bananas", 1))), Ok(DmlOperation::Write(make_write("bananas", 2))), Ok(DmlOperation::Write(make_write("bananas", 3))), Ok(DmlOperation::Write(make_write("bananas", 42))), - ], + ]], sink_rets = [Ok(true), Ok(false), Ok(true), Ok(false),], want_ttbr = 42, want_err_metrics = [ @@ -688,7 +857,8 @@ mod tests { "sequencer_unknown_sequence_number" => 0, "sequencer_invalid_data" => 0, "sequencer_unknown_error" => 0, - "sink_apply_error" => 0 + "sink_apply_error" => 0, + "skipped_sequence_number_amount" => 0 ], want_sink = _ ); @@ -697,12 +867,15 @@ mod tests { // the next op in the stream from being processed. test_stream_handler!( non_fatal_sink_error, - stream_ops = [ + skip_to_oldest_available = false, + stream_ops = vec![vec![ Ok(DmlOperation::Write(make_write("bad_op", 1))), Ok(DmlOperation::Write(make_write("good_op", 2))) - ], + ]], sink_rets = [ - Err(crate::data::Error::Partitioning{source: String::from("Time column not present").into()}), + Err(crate::data::Error::Partitioning { + source: String::from("Time column not present").into() + }), Ok(true), ], want_ttbr = 2, @@ -710,7 +883,8 @@ mod tests { "sequencer_unknown_sequence_number" => 0, "sequencer_invalid_data" => 0, "sequencer_unknown_error" => 0, - "sink_apply_error" => 1 + "sink_apply_error" => 1, + "skipped_sequence_number_amount" => 0 ], want_sink = [ DmlOperation::Write(_), // First call into sink is bad_op, returning an error @@ -720,10 +894,30 @@ mod tests { } ); - // An abnormal end to the steam causes a panic, rather than a silent stream - // reader exit. + #[derive(Debug)] + struct EmptyWriteBufferStreamHandler {} + + #[async_trait] + impl WriteBufferStreamHandler for EmptyWriteBufferStreamHandler { + async fn stream(&mut self) -> BoxStream<'static, Result> { + stream::iter([]).boxed() + } + + async fn seek(&mut self, _sequence_number: u64) -> Result<(), WriteBufferError> { + Ok(()) + } + + fn reset_to_earliest(&mut self) { + // Intentionally left blank + } + } + + // An abnormal end to the steam causes a panic, rather than a silent stream reader exit. #[tokio::test] - #[should_panic = "sequencer KafkaPartition(42) stream for topic kafka_topic_name ended without graceful shutdown"] + #[should_panic( + expected = "sequencer KafkaPartition(42) stream for topic kafka_topic_name ended without \ + graceful shutdown" + )] async fn test_early_stream_end_panic() { let metrics = Arc::new(metric::Registry::default()); let time_provider = Arc::new(SystemProvider::default()); @@ -734,16 +928,18 @@ mod tests { ); // An empty stream iter immediately yields none. - let stream = stream::iter([]); + let write_buffer_stream_handler = EmptyWriteBufferStreamHandler {}; let sink = MockDmlSink::default(); let handler = SequencedStreamHandler::new( - stream, + write_buffer_stream_handler, + SequenceNumber::new(0), sink, lifecycle.handle(), "kafka_topic_name".to_string(), KafkaPartition::new(42), &*metrics, + false, ); handler diff --git a/ioxd_ingester/src/lib.rs b/ioxd_ingester/src/lib.rs index d205dc4e1f..c4631fd838 100644 --- a/ioxd_ingester/src/lib.rs +++ b/ioxd_ingester/src/lib.rs @@ -193,6 +193,7 @@ pub async fn create_ingester_server_type( write_buffer, exec, Arc::clone(&metric_registry), + ingester_config.skip_to_oldest_available, ) .await?, ); diff --git a/logfmt/src/lib.rs b/logfmt/src/lib.rs index 408a897d29..f87581dfac 100644 --- a/logfmt/src/lib.rs +++ b/logfmt/src/lib.rs @@ -392,7 +392,7 @@ mod test { fn quote_not_printable() { assert_eq!(quote_and_escape("foo\nbar"), r#""foo\nbar""#); assert_eq!(quote_and_escape("foo\r\nbar"), r#""foo\r\nbar""#); - assert_eq!(quote_and_escape("foo\0bar"), r#""foo\u{0}bar""#); + assert_eq!(quote_and_escape("foo\0bar"), r#""foo\0bar""#); } #[test] diff --git a/query_functions/Cargo.toml b/query_functions/Cargo.toml index 1ed6884a6b..9cf48c0bc8 100644 --- a/query_functions/Cargo.toml +++ b/query_functions/Cargo.toml @@ -13,7 +13,7 @@ itertools = "0.10.2" lazy_static = "1.4.0" observability_deps = { path = "../observability_deps" } regex = "1" -regex-syntax = "0.6.25" +regex-syntax = "0.6.26" schema = { path = "../schema" } snafu = "0.7" workspace-hack = { path = "../workspace-hack"} diff --git a/query_tests/cases/in/two_chunks_missing_columns.expected b/query_tests/cases/in/two_chunks_missing_columns.expected new file mode 100644 index 0000000000..8694f327af --- /dev/null +++ b/query_tests/cases/in/two_chunks_missing_columns.expected @@ -0,0 +1,8 @@ +-- Test Setup: TwoChunksMissingColumns +-- SQL: SELECT * from "table" order by time; ++--------+--------+--------+------+------+------+--------------------------------+ +| field1 | field2 | field3 | tag1 | tag2 | tag3 | time | ++--------+--------+--------+------+------+------+--------------------------------+ +| 10 | 11 | | a | b | | 1970-01-01T00:00:00.000000100Z | +| 20 | | 22 | a | | c | 1970-01-01T00:00:00.000000200Z | ++--------+--------+--------+------+------+------+--------------------------------+ diff --git a/query_tests/cases/in/two_chunks_missing_columns.sql b/query_tests/cases/in/two_chunks_missing_columns.sql new file mode 100644 index 0000000000..6f0cba621c --- /dev/null +++ b/query_tests/cases/in/two_chunks_missing_columns.sql @@ -0,0 +1,5 @@ +-- Basic query tests +-- IOX_SETUP: TwoChunksMissingColumns + +-- query data +SELECT * from "table" order by time; diff --git a/query_tests/src/cases.rs b/query_tests/src/cases.rs index e046e395ca..aee30e211e 100644 --- a/query_tests/src/cases.rs +++ b/query_tests/src/cases.rs @@ -2,7 +2,7 @@ //! This file is auto generated by query_tests/generate. //! Do not edit manually --> will result in sadness use std::path::Path; -use crate::runner::{Runner, make_output_path, read_file}; +use crate::runner::Runner; #[tokio::test] // Tests from "basic.sql", @@ -117,9 +117,9 @@ async fn test_cases_new_sql_system_tables_sql() { } #[tokio::test] -// Tests from "two_chunks.sql", -async fn test_cases_two_sql() { - let input_path = Path::new("cases").join("in").join("two_chunks.sql"); +// Tests from "pushdown.sql", +async fn test_cases_pushdown_sql() { + let input_path = Path::new("cases").join("in").join("pushdown.sql"); let mut runner = Runner::new(); runner .run(input_path) @@ -134,28 +134,6 @@ async fn test_cases_two_sql() { // Tests from "several_chunks.sql", async fn test_cases_several_chunks_sql() { let input_path = Path::new("cases").join("in").join("several_chunks.sql"); - let output_path = make_output_path(&input_path).unwrap(); - let expected_path = input_path.with_extension("expected"); - - let mut runner = Runner::new(); - let result = runner - .run(input_path) - .await; - if result.is_err() { - let output_contents = read_file(&output_path); - let expected_contents = read_file(&expected_path); - pretty_assertions::assert_eq!(expected_contents, output_contents); - } else { - runner - .flush() - .expect("flush worked"); - } -} - -#[tokio::test] -// Tests from "pushdown.sql", -async fn test_cases_pushdown_sql() { - let input_path = Path::new("cases").join("in").join("pushdown.sql"); let mut runner = Runner::new(); runner .run(input_path) @@ -192,4 +170,32 @@ async fn test_cases_timestamps_sql() { runner .flush() .expect("flush worked"); +} + +#[tokio::test] +// Tests from "two_chunks.sql", +async fn test_cases_two_chunks_sql() { + let input_path = Path::new("cases").join("in").join("two_chunks.sql"); + let mut runner = Runner::new(); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); +} + +#[tokio::test] +// Tests from "two_chunks_missing_columns.sql", +async fn test_cases_two_chunks_missing_columns_sql() { + let input_path = Path::new("cases").join("in").join("two_chunks_missing_columns.sql"); + let mut runner = Runner::new(); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } \ No newline at end of file diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 22e42fd088..30584ed6d4 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -61,6 +61,7 @@ pub fn get_all_setups() -> &'static HashMap> { register_setup!(OneMeasurementRealisticTimes), register_setup!(TwoMeasurementsManyFieldsTwoChunks), register_setup!(ManyFieldsSeveralChunks), + register_setup!(TwoChunksMissingColumns), ] .into_iter() .map(|(name, setup)| (name.to_string(), setup as Arc)) diff --git a/query_tests/src/scenarios/library.rs b/query_tests/src/scenarios/library.rs index 0c5dfd59ac..301fc1352b 100644 --- a/query_tests/src/scenarios/library.rs +++ b/query_tests/src/scenarios/library.rs @@ -1322,3 +1322,30 @@ impl DbSetup for MeasurementForDefect2890 { all_scenarios_for_one_chunk(vec![], vec![], lp, "mm", partition_key).await } } + +#[derive(Debug)] +pub struct TwoChunksMissingColumns {} +#[async_trait] +impl DbSetup for TwoChunksMissingColumns { + async fn make(&self) -> Vec { + let partition_key1 = "a"; + let partition_key2 = "b"; + + let lp_lines1 = vec!["table,tag1=a,tag2=b field1=10,field2=11 100"]; + let lp_lines2 = vec!["table,tag1=a,tag3=c field1=20,field3=22 200"]; + + make_n_chunks_scenario(&[ + ChunkData { + lp_lines: lp_lines1, + partition_key: partition_key1, + ..Default::default() + }, + ChunkData { + lp_lines: lp_lines2, + partition_key: partition_key2, + ..Default::default() + }, + ]) + .await + } +} diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 553ca62d3e..a32daacfe3 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -94,7 +94,7 @@ impl<'a, 'b> ChunkData<'a, 'b> { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ChunkStage { /// In parquet file. Parquet, @@ -383,25 +383,17 @@ pub async fn make_n_chunks_scenario(chunks: &[ChunkData<'_, '_>]) -> Vec]) -> Vec>::new(); + for chunk_data in &chunks { + stage_by_partition + .entry(chunk_data.partition_key) + .or_default() + .push( + chunk_data + .chunk_stage + .expect("Stage should be initialized by now"), + ); + } + for stages in stage_by_partition.values() { + if !stages.windows(2).all(|stages| { + stages[0] + .partial_cmp(&stages[1]) + .map(|o| o.is_le()) + .unwrap_or_default() + }) { + continue 'stage_combinations; + } + } + + // build scenario + let mut scenario_name = format!("{} chunks:", chunks.len()); + let mut mock_ingester = MockIngester::new().await; + + for chunk_data in chunks { let name = make_chunk(&mut mock_ingester, chunk_data).await; write!(&mut scenario_name, ", {}", name).unwrap(); } - assert!(stages_it.next().is_none(), "generated too many stages"); - let db = mock_ingester.into_query_namespace().await; scenarios.push(DbScenario { scenario_name, db }); } @@ -550,7 +572,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>) } } - let mut name = format!("Chunk {}", chunk_stage); + let mut name = format!( + "Chunk stage={} partition={}", + chunk_stage, chunk.partition_key + ); let n_preds = chunk.preds.len(); if n_preds > 0 { let delete_names: Vec<_> = chunk diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 1e0347218d..4e832fcec3 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.60" +channel = "1.61" components = [ "rustfmt", "clippy" ] diff --git a/service_grpc_influxrpc/Cargo.toml b/service_grpc_influxrpc/Cargo.toml index 3b9347a6ab..1d6519aef3 100644 --- a/service_grpc_influxrpc/Cargo.toml +++ b/service_grpc_influxrpc/Cargo.toml @@ -20,7 +20,7 @@ arrow = { version = "14.0.0", features = ["prettyprint"] } async-trait = "0.1" futures = "0.3" prost = "0.10" -regex = "1.5.4" +regex = "1.5.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.81" snafu = "0.7" diff --git a/tracker/src/lock.rs b/tracker/src/lock.rs index 88bf3f69be..0f067dfdc2 100644 --- a/tracker/src/lock.rs +++ b/tracker/src/lock.rs @@ -294,9 +294,12 @@ unsafe impl lock_api::RawRwLockUpgrade #[cfg(test)] mod tests { - use std::time::Duration; + // Clippy isn't recognizing the explicit drops; none of these locks are actually being held + // across await points. See + #![allow(clippy::await_holding_lock)] use super::*; + use std::time::Duration; #[test] fn test_counts() { diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index d4dc117f7c..95c5556e30 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -136,10 +136,11 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { /// /// The [`dml::DmlMeta`] will be propagated where applicable /// - /// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations into a single batch. - /// After this method returns the operation was actually written (i.e. it is NOT buffered any longer). You may use - /// [`flush`](Self::flush) to trigger an early submission (e.g. before some linger time expired), which can be - /// helpful for controlled shutdown. + /// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations + /// into a single batch. After this method returns the operation was actually written (i.e. it + /// is NOT buffered any longer). You may use [`flush`](Self::flush) to trigger an early + /// submission (e.g. before some linger time expired), which can be helpful for controlled + /// shutdown. /// /// Returns the metadata that was written. async fn store_operation( @@ -168,8 +169,9 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { /// Flush all currently blocking store operations ([`store_operation`](Self::store_operation) / /// [`store_lp`](Self::store_lp)). /// - /// This call is pending while outstanding data is being submitted and will return AFTER the flush completed. - /// However you still need to poll the store operations to get the metadata for every write. + /// This call is pending while outstanding data is being submitted and will return AFTER the + /// flush completed. However you still need to poll the store operations to get the metadata + /// for every write. async fn flush(&self) -> Result<(), WriteBufferError>; /// Return type (like `"mock"` or `"kafka"`) of this writer. @@ -183,21 +185,43 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { pub trait WriteBufferStreamHandler: Sync + Send + Debug + 'static { /// Stream that produces DML operations. /// - /// Note that due to the mutable borrow, it is not possible to have multiple streams from the same - /// [`WriteBufferStreamHandler`] instance at the same time. If all streams are dropped and requested again, the last - /// sequence number of the old streams will be the start sequence number for the new streams. If you want to - /// prevent that either create a new [`WriteBufferStreamHandler`] or use [`seek`](Self::seek). + /// Note that due to the mutable borrow, it is not possible to have multiple streams from the + /// same [`WriteBufferStreamHandler`] instance at the same time. If all streams are dropped and + /// requested again, the last sequence number of the old streams will be the start sequence + /// number for the new streams. If you want to prevent that either create a new + /// [`WriteBufferStreamHandler`] or use [`seek`](Self::seek). /// - /// If the sequence number that the stream wants to read is unknown (either because it is in the future or because - /// some retention policy removed it already), the stream will return an error with - /// [`WriteBufferErrorKind::UnknownSequenceNumber`] and will end immediately. - async fn stream(&mut self) -> BoxStream<'_, Result>; + /// If the sequence number that the stream wants to read is unknown (either because it is in + /// the future or because some retention policy removed it already), the stream will return an + /// error with [`WriteBufferErrorKind::UnknownSequenceNumber`] and will end immediately. + async fn stream(&mut self) -> BoxStream<'static, Result>; - /// Seek sequencer to given sequence number. The next output of related streams will be an entry with at least - /// the given sequence number (the actual sequence number might be skipped due to "holes" in the stream). + /// Seek sequencer to given sequence number. The next output of related streams will be an + /// entry with at least the given sequence number (the actual sequence number might be skipped + /// due to "holes" in the stream). /// /// Note that due to the mutable borrow, it is not possible to seek while streams exists. async fn seek(&mut self, sequence_number: u64) -> Result<(), WriteBufferError>; + + /// Reset the sequencer to whatever is the earliest number available in the retained write + /// buffer. Useful to restart if [`WriteBufferErrorKind::UnknownSequenceNumber`] is returned + /// from [`stream`](Self::stream) but that isn't a problem. + fn reset_to_earliest(&mut self); +} + +#[async_trait] +impl WriteBufferStreamHandler for Box { + async fn stream(&mut self) -> BoxStream<'static, Result> { + self.as_mut().stream().await + } + + async fn seek(&mut self, sequence_number: u64) -> Result<(), WriteBufferError> { + self.as_mut().seek(sequence_number).await + } + + fn reset_to_earliest(&mut self) { + self.as_mut().reset_to_earliest() + } } /// Produce streams (one per sequencer) of [`DmlWrite`]s. @@ -231,8 +255,9 @@ pub trait WriteBufferReading: Sync + Send + Debug + 'static { /// Get high watermark (= what we believe is the next sequence number to be added). /// - /// Can be used to calculate lag. Note that since the watermark is "next sequence ID number to be added", it starts - /// at 0 and after the entry with sequence number 0 is added to the buffer, it is 1. + /// Can be used to calculate lag. Note that since the watermark is "next sequence ID number to + /// be added", it starts at 0 and after the entry with sequence number 0 is added to the + /// buffer, it is 1. async fn fetch_high_watermark(&self, sequencer_id: u32) -> Result; /// Return type (like `"mock"` or `"kafka"`) of this reader. @@ -273,8 +298,8 @@ pub mod test_utils { /// Create a new context. /// - /// This will be called multiple times during the test suite. Each resulting context must represent an isolated - /// environment. + /// This will be called multiple times during the test suite. Each resulting context must + /// represent an isolated environment. async fn new_context(&self, n_sequencers: NonZeroU32) -> Self::Context { self.new_context_with_time(n_sequencers, Arc::new(iox_time::SystemProvider::new())) .await @@ -289,7 +314,8 @@ pub mod test_utils { /// Context used during testing. /// - /// Represents an isolated environment. Actions like sequencer creations and writes must not leak across context boundaries. + /// Represents an isolated environment. Actions like sequencer creations and writes must not + /// leak across context boundaries. #[async_trait] pub trait TestContext: Send + Sync { /// Write buffer writer implementation specific to this context and adapter. @@ -310,10 +336,11 @@ pub mod test_utils { /// Generic test suite that must be passed by all proper write buffer implementations. /// - /// See [`TestAdapter`] for how to make a concrete write buffer implementation work with this test suite. + /// See [`TestAdapter`] for how to make a concrete write buffer implementation work with this + /// test suite. /// - /// Note that you might need more tests on top of this to assert specific implementation behaviors, edge cases, and - /// error handling. + /// Note that you might need more tests on top of this to assert specific implementation + /// behaviors, edge cases, and error handling. pub async fn perform_generic_tests(adapter: T) where T: TestAdapter, @@ -323,6 +350,7 @@ pub mod test_utils { test_multi_sequencer_io(&adapter).await; test_multi_writer_multi_reader(&adapter).await; test_seek(&adapter).await; + test_reset_to_earliest(&adapter).await; test_watermark(&adapter).await; test_timestamp(&adapter).await; test_sequencer_auto_creation(&adapter).await; @@ -366,6 +394,7 @@ pub mod test_utils { /// Test IO with a single writer and single reader stream. /// /// This tests that: + /// /// - streams process data in order /// - readers can handle the "pending" state w/o erroring /// - readers unblock after being in "pending" state @@ -410,6 +439,7 @@ pub mod test_utils { /// Tests multiple subsequently created streams from a single [`WriteBufferStreamHandler`]. /// /// This tests that: + /// /// - readers remember their sequence number (and "pending" state) even when streams are dropped /// - state is not shared between handlers async fn test_multi_stream_io(adapter: &T) @@ -437,8 +467,8 @@ pub mod test_utils { let mut stream = stream_handler.stream().await; assert_write_op_eq(&stream.next().await.unwrap().unwrap(), &w1); - // re-creating stream after reading remembers sequence number, but wait a bit to provoke the stream to buffer - // some entries + // re-creating stream after reading remembers sequence number, but wait a bit to provoke + // the stream to buffer some entries tokio::time::sleep(Duration::from_millis(10)).await; drop(stream); let mut stream = stream_handler.stream().await; @@ -460,7 +490,9 @@ pub mod test_utils { /// Test single reader-writer IO w/ multiple sequencers. /// /// This tests that: - /// - writes go to and reads come from the right sequencer, aka that sequencers provide a namespace-like isolation + /// + /// - writes go to and reads come from the right sequencer, aka that sequencers provide a + /// namespace-like isolation /// - "pending" states are specific to a sequencer async fn test_multi_sequencer_io(adapter: &T) where @@ -512,6 +544,7 @@ pub mod test_utils { /// Test multiple multiple writers and multiple readers on multiple sequencers. /// /// This tests that: + /// /// - writers retrieve consistent sequencer IDs /// - writes go to and reads come from the right sequencer, similar /// to [`test_multi_sequencer_io`] but less detailed @@ -556,10 +589,11 @@ pub mod test_utils { /// Test seek implemention of readers. /// /// This tests that: + /// /// - seeking is specific to the reader AND sequencer /// - forward and backwards seeking works - /// - seeking past the end of the known content works (results in "pending" status and remembers sequence number and - /// not just "next entry") + /// - seeking past the end of the known content works (results in "pending" status and + /// remembers sequence number and not just "next entry") async fn test_seek(adapter: &T) where T: TestAdapter, @@ -609,7 +643,8 @@ pub mod test_utils { assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2]).await; // seek to far end and then add data - // The affected stream should error and then stop. The other streams should still be pending. + // The affected stream should error and then stop. The other streams should still be + // pending. handler_1_1_a.seek(1_000_000).await.unwrap(); let w_east_3 = write("namespace", &writer, entry_east_3, 0, None).await; @@ -634,9 +669,55 @@ pub mod test_utils { assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2, &w_east_3]).await; } + /// Test reset to earliest implemention of readers. + /// + /// This tests that: + /// + /// - Calling the function jumps to the earliest available sequence number if the earliest + /// available sequence number is earlier than the current sequence number + /// - Calling the function jumps to the earliest available sequence number if the earliest + /// available sequence number is later than the current sequence number + async fn test_reset_to_earliest(adapter: &T) + where + T: TestAdapter, + { + let context = adapter.new_context(NonZeroU32::try_from(2).unwrap()).await; + + let entry_east_1 = "upc,region=east user=1 100"; + let entry_east_2 = "upc,region=east user=2 200"; + + let writer = context.writing(true).await.unwrap(); + + let mut sequencer_ids = writer.sequencer_ids(); + let sequencer_id_1 = set_pop_first(&mut sequencer_ids).unwrap(); + + let w_east_1 = write("namespace", &writer, entry_east_1, sequencer_id_1, None).await; + let w_east_2 = write("namespace", &writer, entry_east_2, sequencer_id_1, None).await; + + let reader_1 = context.reading(true).await.unwrap(); + + let mut handler_1_1_a = reader_1.stream_handler(sequencer_id_1).await.unwrap(); + + // forward seek + handler_1_1_a + .seek(w_east_2.meta().sequence().unwrap().sequence_number) + .await + .unwrap(); + assert_reader_content(&mut handler_1_1_a, &[&w_east_2]).await; + + // reset to earliest goes back to 0; stream re-fetches earliest record + handler_1_1_a.reset_to_earliest(); + assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2]).await; + + // TODO: https://github.com/influxdata/influxdb_iox/issues/4651 + // Remove first write operation to simulate retention policies evicting some records + // reset to earliest goes to whatever's available + } + /// Test watermark fetching. /// /// This tests that: + /// /// - watermarks for empty sequencers is 0 /// - watermarks for non-empty sequencers is "last sequence ID plus 1" async fn test_watermark(adapter: &T) @@ -722,6 +803,7 @@ pub mod test_utils { /// Test that sequencer auto-creation works. /// /// This tests that: + /// /// - both writer and reader cannot be constructed when sequencers are missing /// - both writer and reader can be auto-create sequencers async fn test_sequencer_auto_creation(adapter: &T) @@ -749,6 +831,7 @@ pub mod test_utils { /// Test sequencer IDs reporting of readers and writers. /// /// This tests that: + /// /// - all sequencers are reported async fn test_sequencer_ids(adapter: &T) where @@ -864,6 +947,7 @@ pub mod test_utils { /// Test usage w/ multiple namespaces. /// /// Tests that: + /// /// - namespace names or propagated correctly from writer to reader /// - all namespaces end up in a single stream async fn test_multi_namespaces(adapter: &T) @@ -928,14 +1012,16 @@ pub mod test_utils { /// Assert that the content of the reader is as expected. /// - /// This will read `expected_writes.len()` from the reader and then ensures that the stream is pending. + /// This will read `expected_writes.len()` from the reader and then ensures that the stream is + /// pending. async fn assert_reader_content( actual_stream_handler: &mut Box, expected_writes: &[&DmlWrite], ) { let actual_stream = actual_stream_handler.stream().await; - // we need to limit the stream to `expected_writes.len()` elements, otherwise it might be pending forever + // we need to limit the stream to `expected_writes.len()` elements, otherwise it might be + // pending forever let actual_writes: Vec<_> = actual_stream .take(expected_writes.len()) .try_collect() @@ -955,6 +1041,7 @@ pub mod test_utils { /// Asserts that given span context are the same or that `second` links back to `first`. /// /// "Same" means: + /// /// - identical trace ID /// - identical span ID /// - identical parent span ID @@ -983,7 +1070,8 @@ pub mod test_utils { assert_eq!(first.parent_span_id, second.parent_span_id); } - /// Assert that all span relations (parents, links) are found within the set of spans or within the set of roots. + /// Assert that all span relations (parents, links) are found within the set of spans or within + /// the set of roots. fn assert_span_relations_closed(spans: &[Span], roots: &[SpanContext]) { let all_ids: HashSet<_> = spans .iter() @@ -1003,7 +1091,8 @@ pub mod test_utils { /// Assert that given stream is pending. /// - /// This will will try to poll the stream for a bit to ensure that async IO has a chance to catch up. + /// This will will try to poll the stream for a bit to ensure that async IO has a chance to + /// catch up. async fn assert_stream_pending(stream: &mut S) where S: Stream + Send + Unpin, @@ -1063,8 +1152,8 @@ pub mod test_utils { } (false, None) => { eprintln!( - "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \ - run" + "skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT \ + to run" ); return; } diff --git a/write_buffer/src/file.rs b/write_buffer/src/file.rs index 2d94a6cdd1..a9b5ebd76e 100644 --- a/write_buffer/src/file.rs +++ b/write_buffer/src/file.rs @@ -285,7 +285,7 @@ pub struct FileBufferStreamHandler { #[async_trait] impl WriteBufferStreamHandler for FileBufferStreamHandler { - async fn stream(&mut self) -> BoxStream<'_, Result> { + async fn stream(&mut self) -> BoxStream<'static, Result> { let committed = self.path.join("committed"); ConsumerStream::new( @@ -304,6 +304,11 @@ impl WriteBufferStreamHandler for FileBufferStreamHandler { self.terminated.store(false, Ordering::SeqCst); Ok(()) } + + fn reset_to_earliest(&mut self) { + self.next_sequence_number.store(0, Ordering::SeqCst); + self.terminated.store(false, Ordering::SeqCst); + } } /// File-based write buffer reader. diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index 97523eb9a1..dbdbec8d8e 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -130,7 +130,7 @@ pub struct RSKafkaStreamHandler { #[async_trait] impl WriteBufferStreamHandler for RSKafkaStreamHandler { - async fn stream(&mut self) -> BoxStream<'_, Result> { + async fn stream(&mut self) -> BoxStream<'static, Result> { if self.terminated.load(Ordering::SeqCst) { return futures::stream::empty().boxed(); } @@ -161,6 +161,8 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler { } let stream = stream_builder.build(); + let sequencer_id = self.sequencer_id; + let stream = stream.map(move |res| { let (record, _watermark) = match res { Ok(x) => x, @@ -185,7 +187,7 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler { IoxHeaders::from_headers(record.record.headers, trace_collector.as_ref())?; let sequence = Sequence { - sequencer_id: self.sequencer_id, + sequencer_id, sequence_number: record .offset .try_into() @@ -220,6 +222,11 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler { self.terminated.store(false, Ordering::SeqCst); Ok(()) } + + fn reset_to_earliest(&mut self) { + *self.next_offset.lock() = None; + self.terminated.store(false, Ordering::SeqCst); + } } #[derive(Debug)] diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index bcb6de785c..d595670657 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -11,7 +11,10 @@ use parking_lot::Mutex; use std::{ collections::{BTreeMap, BTreeSet}, num::NonZeroU32, - sync::Arc, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, + Arc, + }, task::{Poll, Waker}, }; @@ -70,7 +73,8 @@ pub struct MockBufferSharedState { impl MockBufferSharedState { /// Create new shared state w/ N sequencers. /// - /// This is equivalent to [`uninitialized`](Self::uninitialized) followed by [`init`](Self::init). + /// This is equivalent to [`uninitialized`](Self::uninitialized) followed by + /// [`init`](Self::init). pub fn empty_with_n_sequencers(n_sequencers: NonZeroU32) -> Self { let state = Self::uninitialized(); state.init(n_sequencers); @@ -87,6 +91,7 @@ impl MockBufferSharedState { /// Initialize shared state w/ N sequencers. /// /// # Panics + /// /// - when state is already initialized pub fn init(&self, n_sequencers: NonZeroU32) { let mut guard = self.writes.lock(); @@ -107,6 +112,7 @@ impl MockBufferSharedState { /// Push a new delete to the specified sequencer /// /// # Panics + /// /// - when delete is not sequenced /// - when no sequencer was initialized /// - when specified sequencer does not exist @@ -118,6 +124,7 @@ impl MockBufferSharedState { /// Push a new entry to the specified sequencer. /// /// # Panics + /// /// - when write is not sequenced /// - when no sequencer was initialized /// - when specified sequencer does not exist @@ -129,6 +136,7 @@ impl MockBufferSharedState { /// Push a new operation to the specified sequencer /// /// # Panics + /// /// - when operation is not sequenced /// - when no sequencer was initialized /// - when specified sequencer does not exist @@ -168,6 +176,7 @@ impl MockBufferSharedState { /// Push error to specified sequencer. /// /// # Panics + /// /// - when no sequencer was initialized /// - when sequencer does not exist pub fn push_error(&self, error: WriteBufferError, sequencer_id: u32) { @@ -183,6 +192,7 @@ impl MockBufferSharedState { /// Get messages (entries and errors) for specified sequencer. /// /// # Panics + /// /// - when no sequencer was initialized /// - when sequencer does not exist pub fn get_messages(&self, sequencer_id: u32) -> Vec> { @@ -203,6 +213,7 @@ impl MockBufferSharedState { /// Provides a way to wipe messages (e.g. to simulate retention periods in Kafka) /// /// # Panics + /// /// - when no sequencer was initialized /// - when sequencer does not exist pub fn clear_messages(&self, sequencer_id: u32) { @@ -346,7 +357,7 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors { #[derive(Debug)] pub struct MockBufferForReading { - shared_state: MockBufferSharedState, + shared_state: Arc, n_sequencers: u32, } @@ -369,7 +380,7 @@ impl MockBufferForReading { }; Ok(Self { - shared_state: state, + shared_state: Arc::new(state), n_sequencers, }) } @@ -379,45 +390,55 @@ impl MockBufferForReading { #[derive(Debug)] pub struct MockBufferStreamHandler { /// Shared state. - shared_state: MockBufferSharedState, + shared_state: Arc, /// Own sequencer ID. sequencer_id: u32, /// Index within the entry vector. - vector_index: usize, + vector_index: Arc, /// Offset within the sequencer IDs. offset: u64, /// Flags if the stream is terminated, e.g. due to "offset out of range" - terminated: bool, + terminated: Arc, } #[async_trait] impl WriteBufferStreamHandler for MockBufferStreamHandler { - async fn stream(&mut self) -> BoxStream<'_, Result> { - futures::stream::poll_fn(|cx| { - if self.terminated { + async fn stream(&mut self) -> BoxStream<'static, Result> { + // Don't reference `self` in the closure, move these instead + let terminated = Arc::clone(&self.terminated); + let shared_state = Arc::clone(&self.shared_state); + let sequencer_id = self.sequencer_id; + let vector_index = Arc::clone(&self.vector_index); + let offset = self.offset; + + futures::stream::poll_fn(move |cx| { + if terminated.load(SeqCst) { return Poll::Ready(None); } - let mut guard = self.shared_state.writes.lock(); + let mut guard = shared_state.writes.lock(); let writes = guard.as_mut().unwrap(); - let writes_vec = writes.get_mut(&self.sequencer_id).unwrap(); + let writes_vec = writes.get_mut(&sequencer_id).unwrap(); let entries = &writes_vec.writes; - while entries.len() > self.vector_index { - let write_result = &entries[self.vector_index]; + let mut vi = vector_index.load(SeqCst); + while entries.len() > vi { + let write_result = &entries[vi]; // consume entry - self.vector_index += 1; + vi = vector_index + .fetch_update(SeqCst, SeqCst, |n| Some(n + 1)) + .unwrap(); match write_result { Ok(write) => { // found an entry => need to check if it is within the offset let sequence = write.meta().sequence().unwrap(); - if sequence.sequence_number >= self.offset { + if sequence.sequence_number >= offset { // within offset => return entry to caller return Poll::Ready(Some(Ok(write.clone()))); } else { @@ -432,7 +453,7 @@ impl WriteBufferStreamHandler for MockBufferStreamHandler { } } - // check if we have seeked to far + // check if we have seeked too far let next_offset = entries .iter() .filter_map(|write_result| { @@ -446,8 +467,8 @@ impl WriteBufferStreamHandler for MockBufferStreamHandler { .max() .map(|x| x + 1) .unwrap_or_default(); - if self.offset > next_offset { - self.terminated = true; + if offset > next_offset { + terminated.store(true, SeqCst); return Poll::Ready(Some(Err(WriteBufferError::unknown_sequence_number( format!("unknown sequence number, high watermark is {next_offset}"), )))); @@ -464,13 +485,19 @@ impl WriteBufferStreamHandler for MockBufferStreamHandler { self.offset = sequence_number; // reset position to start since seeking might go backwards - self.vector_index = 0; + self.vector_index.store(0, SeqCst); // reset termination state - self.terminated = false; + self.terminated.store(false, SeqCst); Ok(()) } + + fn reset_to_earliest(&mut self) { + self.offset = 0; + self.vector_index.store(0, SeqCst); + self.terminated.store(false, SeqCst); + } } #[async_trait] @@ -478,6 +505,7 @@ impl WriteBufferReading for MockBufferForReading { fn sequencer_ids(&self) -> BTreeSet { (0..self.n_sequencers).into_iter().collect() } + async fn stream_handler( &self, sequencer_id: u32, @@ -487,11 +515,11 @@ impl WriteBufferReading for MockBufferForReading { } Ok(Box::new(MockBufferStreamHandler { - shared_state: self.shared_state.clone(), + shared_state: Arc::clone(&self.shared_state), sequencer_id, - vector_index: 0, + vector_index: Arc::new(AtomicUsize::new(0)), offset: 0, - terminated: false, + terminated: Arc::new(AtomicBool::new(false)), })) } @@ -521,7 +549,7 @@ pub struct MockStreamHandlerThatAlwaysErrors; #[async_trait] impl WriteBufferStreamHandler for MockStreamHandlerThatAlwaysErrors { - async fn stream(&mut self) -> BoxStream<'_, Result> { + async fn stream(&mut self) -> BoxStream<'static, Result> { futures::stream::poll_fn(|_cx| { Poll::Ready(Some(Err(String::from( "Something bad happened while reading from stream", @@ -534,6 +562,10 @@ impl WriteBufferStreamHandler for MockStreamHandlerThatAlwaysErrors { async fn seek(&mut self, _sequence_number: u64) -> Result<(), WriteBufferError> { Err(String::from("Something bad happened while seeking the stream").into()) } + + fn reset_to_earliest(&mut self) { + // Intentionally left blank + } } #[async_trait]