diff --git a/ingester/src/buffer_tree/namespace.rs b/ingester/src/buffer_tree/namespace.rs index 524da841d9..7c07a1b408 100644 --- a/ingester/src/buffer_tree/namespace.rs +++ b/ingester/src/buffer_tree/namespace.rs @@ -81,13 +81,11 @@ pub(crate) struct NamespaceData { /// [`PartitionData`]: super::partition::PartitionData partition_provider: Arc, - /// A counter tracking the approximate number of partitions currently - /// buffered. + /// A counter tracking the number of non-empty partitions currently + /// buffered for this namespace. /// - /// This counter is NOT atomically incremented w.r.t creation of the - /// partitions it tracks, and therefore is susceptible to "overrun", - /// breaching the configured partition count limit by a relatively small - /// degree. + /// This counter is eventually consistent / relaxed when read, but strongly + /// consistent when enforced. partition_count: Arc, post_write_observer: Arc, diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index 70c953f92f..a265a107fe 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -12,10 +12,11 @@ use schema::{merge::SchemaMerger, sort::SortKey, Schema}; use self::{ buffer::{traits::Queryable, DataBuffer}, + counter::PartitionCounter, persisting::{BatchIdent, PersistingData}, persisting_list::PersistingList, }; -use super::{namespace::NamespaceName, table::metadata::TableMetadata}; +use super::{namespace::NamespaceName, table::metadata::TableMetadata, BufferWriteError}; use crate::{ deferred_load::DeferredLoad, query::projection::OwnedProjection, query_adaptor::QueryAdaptor, }; @@ -93,6 +94,13 @@ pub struct PartitionData { /// / deferred. table: Arc>, + /// An optimised empty indicator that returns true when the `buffer` and + /// `persisting` list are empty: a subsequent query will not return any + /// rows. + /// + /// False when this partition contains data. + is_empty: bool, + /// A [`DataBuffer`] for incoming writes. buffer: DataBuffer, @@ -112,6 +120,15 @@ pub struct PartitionData { /// The number of persist operations completed over the lifetime of this /// [`PartitionData`]. completed_persistence_count: u64, + + /// A counter tracking the number of non-empty partitions per namespace. + /// + /// This value is incremented when this [`PartitionData`] transitions from + /// empty, to non-empty during write buffering. Likewise the value is + /// decremented when persistence is marked as complete and the persisted + /// data is dropped, transitioning the [`PartitionData`] from non-empty to + /// empty. + partition_counter: Arc, } impl PartitionData { @@ -125,6 +142,7 @@ impl PartitionData { table_id: TableId, table: Arc>, sort_key: SortKeyState, + partition_counter: Arc, ) -> Self { Self { partition_id, @@ -138,6 +156,8 @@ impl PartitionData { persisting: PersistingList::default(), started_persistence_count: BatchIdent::default(), completed_persistence_count: 0, + partition_counter, + is_empty: true, } } @@ -146,10 +166,28 @@ impl PartitionData { &mut self, mb: MutableBatch, sequence_number: SequenceNumber, - ) -> Result<(), mutable_batch::Error> { + ) -> Result<(), BufferWriteError> { + if self.is_empty() { + // This partition is transitioning from empty, to non-empty. + // + // Because non-empty partitions are bounded per namespace, a check + // must be made to ensure accepting this write does not exceed the + // (approximate) limit. + self.partition_counter.inc()?; + self.is_empty = false; + } + + // Invariant: the non-empty partition counter is always >0 at this + // point because this partition is non-empty. + debug_assert_ne!(self.partition_counter.read(), 0); + // Buffer the write. self.buffer.buffer_write(mb, sequence_number)?; + // Invariant: if the partition contains a buffered write, it must report + // non-empty. + debug_assert!(!self.is_empty()); + trace!( namespace_id = %self.namespace_id, table_id = %self.table_id, @@ -187,7 +225,16 @@ impl PartitionData { /// Returns true if this partition contains no data (buffered or /// persisting). pub(crate) fn is_empty(&self) -> bool { - self.persisting.is_empty() && self.buffer.rows() == 0 + // Assert the optimised bool is in-sync with the actual state. + debug_assert_eq!( + self.persisting.is_empty() && self.buffer.rows() == 0, + self.is_empty + ); + + // Optimisation: instead of inspecting the persisting list, and + // inspecting the row count of the data buffer, the empty state can be + // correctly and cheaply tracked separately. + self.is_empty } /// Return the timestamp min/max values for the data contained within this @@ -271,6 +318,13 @@ impl PartitionData { return None; } + // Invariant: the number of rows is always > 0 at this point. + debug_assert_ne!(self.rows(), 0); + + // Invariant: the non-empty partition counter is always >0 at this + // point because this partition is non-empty. + debug_assert_ne!(self.partition_counter.read(), 0); + // Construct the query adaptor over the partition data. // // `data` MUST contain at least one row, or the constructor panics. This @@ -322,6 +376,10 @@ impl PartitionData { // From this point on, all code MUST be infallible or the buffered data // contained within persisting may be dropped. + // Invariant: the non-empty partition counter is always >0 at this + // point because this partition is non-empty. + assert!(self.partition_counter.read() > 0); + // Increment the "started persist" counter. // // This is used to cheaply identify batches given to the @@ -351,6 +409,13 @@ impl PartitionData { // order). self.persisting.push(batch_ident, fsm); + // Invariant: the partition must not be marked as empty when there's an + // entry in the persisting list. + debug_assert!(!self.is_empty()); + + // Invariant: the number of rows is always > 0 at this point. + debug_assert_ne!(self.rows(), 0); + Some(data) } @@ -364,6 +429,10 @@ impl PartitionData { /// This method panics if [`Self`] is not marked as undergoing a persist /// operation, or `batch` is not currently being persisted. pub(crate) fn mark_persisted(&mut self, batch: PersistingData) -> SequenceNumberSet { + // Invariant: the partition must not be marked as empty when marking + // persisting data as complete. + debug_assert!(!self.is_empty()); + let fsm = self.persisting.remove(batch.batch_ident()); self.completed_persistence_count += 1; @@ -379,6 +448,22 @@ impl PartitionData { "marking partition persistence complete" ); + // Invariant: the non-empty partition counter is always >0 at this + // point because this partition is non-empty. + assert!(self.partition_counter.read() > 0); + + // If this partition is now empty, the buffered partition counter should + // be decremented to return the permit to the per-namespace pool. + // + // Check the actual partition data content (rather than relying on + // is_empty bool) as the list was modified above and the bool state may + // need updating. + if self.persisting.is_empty() && self.buffer.rows() == 0 { + // This partitioning from non-empty to empty. + self.partition_counter.dec(); + self.is_empty = true; + } + // Return the set of IDs this buffer contained. fsm.into_sequence_number_set() } diff --git a/ingester/src/buffer_tree/partition/counter.rs b/ingester/src/buffer_tree/partition/counter.rs index 4efa09e6a8..1870f8bb91 100644 --- a/ingester/src/buffer_tree/partition/counter.rs +++ b/ingester/src/buffer_tree/partition/counter.rs @@ -3,10 +3,14 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; +use crate::buffer_tree::BufferWriteError; + /// A counter typed for counting partitions and applying a configured limit. /// -/// No ordering is guaranteed - increments and reads may be arbitrarily -/// reordered w.r.t other memory operations (relaxed ordering). +/// This value is completely unsynchronised (in the context of memory ordering) +/// and relies on external thread synchronisation. Calls to change the value of +/// this counter may be arbitrarily reordered with other memory operations - the +/// caller is responsible for providing external ordering where necessary. #[derive(Debug)] pub(crate) struct PartitionCounter { current: AtomicUsize, @@ -22,22 +26,51 @@ impl PartitionCounter { } /// Increment the counter by 1. - pub(crate) fn inc(&self) { - self.current.fetch_add(1, Ordering::Relaxed); + /// + /// Return [`Err`] if the configured limit has been reached, else returns + /// [`Ok`] if the current value is below the limit. + pub(crate) fn inc(&self) -> Result<(), BufferWriteError> { + if self.current.fetch_add(1, Ordering::Relaxed) >= self.max { + // The limit was exceeded, roll back the addition and return an + // error. + let count = self.current.fetch_sub(1, Ordering::Relaxed); + return Err(BufferWriteError::PartitionLimit { count: count - 1 }); + } + + Ok(()) + } + + /// Decrement the counter by 1. + /// + /// # Panics + /// + /// This method MAY panic if the counter wraps around - never decrement more + /// than previously incremented. + pub(crate) fn dec(&self) { + let v = self.current.fetch_sub(1, Ordering::Relaxed); + + // Correctness: the fetch operations are RMW, which are guaranteed to + // see a monotonic history, therefore any prior fetch_add always happens + // before the fetch_sub above. + debug_assert_ne!(v, usize::MAX); // MUST never wraparound wraparound + } + + /// Returns an error if the limit has been reached and a subsequent + /// increment would fail. + pub(crate) fn is_maxed(&self) -> Result<(), BufferWriteError> { + let count = self.read(); + if count >= self.max { + return Err(BufferWriteError::PartitionLimit { count }); + } + + Ok(()) } /// Read the approximate counter value. - /// - /// Reads may return stale values, but will always be monotonic. pub(crate) fn read(&self) -> usize { self.current.load(Ordering::Relaxed) } - /// Return `true` if the configured limit has been reached. - pub(crate) fn is_maxed(&self) -> bool { - self.read() >= self.max - } - #[cfg(test)] pub(crate) fn set(&self, v: usize) { self.current.store(v, Ordering::Relaxed) @@ -48,6 +81,8 @@ impl PartitionCounter { mod tests { use super::*; + use assert_matches::assert_matches; + #[test] fn test_counter() { const N: usize = 100; @@ -55,12 +90,23 @@ mod tests { let c = PartitionCounter::new(NonZeroUsize::new(N).unwrap()); for v in 0..N { - assert!(!c.is_maxed()); assert_eq!(c.read(), v); - c.inc(); + assert_matches!(c.is_maxed(), Ok(())); + assert!(c.inc().is_ok()); } assert_eq!(c.read(), N); - assert!(c.is_maxed()); + assert_matches!( + c.is_maxed(), + Err(BufferWriteError::PartitionLimit { count: N }) + ); + assert_matches!(c.inc(), Err(BufferWriteError::PartitionLimit { count: N })); + assert_eq!(c.read(), N); // Counter value unchanged. + + c.dec(); + assert_eq!(c.read(), N - 1); + assert_matches!(c.is_maxed(), Ok(())); + assert_matches!(c.inc(), Ok(())); + assert_eq!(c.read(), N); } } diff --git a/ingester/src/buffer_tree/partition/resolver/cache.rs b/ingester/src/buffer_tree/partition/resolver/cache.rs index c7e47b9878..395f0355a5 100644 --- a/ingester/src/buffer_tree/partition/resolver/cache.rs +++ b/ingester/src/buffer_tree/partition/resolver/cache.rs @@ -11,7 +11,9 @@ use super::r#trait::PartitionProvider; use crate::{ buffer_tree::{ namespace::NamespaceName, - partition::{resolver::SortKeyResolver, PartitionData, SortKeyState}, + partition::{ + counter::PartitionCounter, resolver::SortKeyResolver, PartitionData, SortKeyState, + }, table::metadata::TableMetadata, }, deferred_load::DeferredLoad, @@ -168,6 +170,7 @@ where namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { // Use the cached PartitionKey instead of the caller's partition_key, // instead preferring to reuse the already-shared Arc in the cache. @@ -199,6 +202,7 @@ where table_id, table, SortKeyState::Deferred(Arc::new(sort_key_resolver)), + partition_counter, ))); } @@ -206,7 +210,14 @@ where // Otherwise delegate to the catalog / inner impl. self.inner - .get_partition(partition_key, namespace_id, namespace_name, table_id, table) + .get_partition( + partition_key, + namespace_id, + namespace_name, + table_id, + table, + partition_counter, + ) .await } } @@ -216,6 +227,8 @@ mod tests { // Harmless in tests - saves a bunch of extra vars. #![allow(clippy::await_holding_lock)] + use std::num::NonZeroUsize; + use data_types::SortedColumnSet; use iox_catalog::mem::MemCatalog; @@ -259,6 +272,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; @@ -302,6 +316,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; @@ -359,6 +374,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; @@ -394,6 +410,7 @@ mod tests { defer_namespace_name_1_sec(), other_table, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; diff --git a/ingester/src/buffer_tree/partition/resolver/catalog.rs b/ingester/src/buffer_tree/partition/resolver/catalog.rs index a540766c2b..facdf5b7a2 100644 --- a/ingester/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester/src/buffer_tree/partition/resolver/catalog.rs @@ -15,7 +15,8 @@ use crate::{ buffer_tree::{ namespace::NamespaceName, partition::{ - resolver::build_sort_key_from_sort_key_ids_and_columns, PartitionData, SortKeyState, + counter::PartitionCounter, resolver::build_sort_key_from_sort_key_ids_and_columns, + PartitionData, SortKeyState, }, table::metadata::TableMetadata, }, @@ -76,6 +77,7 @@ impl PartitionProvider for CatalogPartitionResolver { namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { debug!( %partition_key, @@ -119,6 +121,7 @@ impl PartitionProvider for CatalogPartitionResolver { table_id, table, SortKeyState::Provided(sort_key, p_sort_key_ids), + partition_counter, ))) } } @@ -128,7 +131,7 @@ mod tests { // Harmless in tests - saves a bunch of extra vars. #![allow(clippy::await_holding_lock)] - use std::{sync::Arc, time::Duration}; + use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use assert_matches::assert_matches; use iox_catalog::{ @@ -181,6 +184,7 @@ mod tests { }, &metrics, )), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; diff --git a/ingester/src/buffer_tree/partition/resolver/coalesce.rs b/ingester/src/buffer_tree/partition/resolver/coalesce.rs index e08c8d31c0..2b170b2e36 100644 --- a/ingester/src/buffer_tree/partition/resolver/coalesce.rs +++ b/ingester/src/buffer_tree/partition/resolver/coalesce.rs @@ -14,7 +14,9 @@ use parking_lot::Mutex; use crate::{ buffer_tree::{ - namespace::NamespaceName, partition::PartitionData, table::metadata::TableMetadata, + namespace::NamespaceName, + partition::{counter::PartitionCounter, PartitionData}, + table::metadata::TableMetadata, }, deferred_load::DeferredLoad, }; @@ -147,6 +149,7 @@ where namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { let key = Key { table_id, @@ -170,6 +173,7 @@ where namespace_name, table_id, table, + partition_counter, )); // Make the future poll-able by many callers, all of which @@ -233,6 +237,7 @@ async fn do_fetch( namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> where T: PartitionProvider + 'static, @@ -247,7 +252,14 @@ where // (which would cause the connection to be returned). tokio::spawn(async move { inner - .get_partition(partition_key, namespace_id, namespace_name, table_id, table) + .get_partition( + partition_key, + namespace_id, + namespace_name, + table_id, + table, + partition_counter, + ) .await }) .await @@ -258,6 +270,7 @@ where mod tests { use std::{ future, + num::NonZeroUsize, sync::Arc, task::{Context, Poll}, time::Duration, @@ -300,6 +313,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) }) .collect::>() @@ -334,6 +348,7 @@ mod tests { _namespace_name: Arc>, _table_id: TableId, _table: Arc>, + _partition_counter: Arc, ) -> core::pin::Pin< Box< dyn core::future::Future>> @@ -376,6 +391,7 @@ mod tests { Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, Arc::clone(&table_loader), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ); let pa_2 = layer.get_partition( ARBITRARY_PARTITION_KEY.clone(), @@ -383,6 +399,7 @@ mod tests { Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, Arc::clone(&table_loader), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ); let waker = futures::task::noop_waker(); @@ -403,6 +420,7 @@ mod tests { namespace_loader, ARBITRARY_TABLE_ID, table_loader, + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .with_timeout_panic(Duration::from_secs(5)) .await; @@ -433,6 +451,7 @@ mod tests { _namespace_name: Arc>, _table_id: TableId, _table: Arc>, + _partition_counter: Arc, ) -> Arc> { let waker = self.wait.notified(); let permit = self.sem.acquire().await.unwrap(); @@ -473,6 +492,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ); let waker = futures::task::noop_waker(); diff --git a/ingester/src/buffer_tree/partition/resolver/mock.rs b/ingester/src/buffer_tree/partition/resolver/mock.rs index f40da05a81..0fd2679353 100644 --- a/ingester/src/buffer_tree/partition/resolver/mock.rs +++ b/ingester/src/buffer_tree/partition/resolver/mock.rs @@ -9,7 +9,9 @@ use parking_lot::Mutex; use super::r#trait::PartitionProvider; use crate::{ buffer_tree::{ - namespace::NamespaceName, partition::PartitionData, table::metadata::TableMetadata, + namespace::NamespaceName, + partition::{counter::PartitionCounter, PartitionData}, + table::metadata::TableMetadata, }, deferred_load::DeferredLoad, test_util::PartitionDataBuilder, @@ -57,6 +59,7 @@ impl PartitionProvider for MockPartitionProvider { namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { let p = self .partitions @@ -68,14 +71,17 @@ impl PartitionProvider for MockPartitionProvider { let mut p = p.with_namespace_id(namespace_id); - // If the test provided a namespace/table loader, use it, otherwise default to the - // one provided in this call. + // If the test provided a namespace/table loader, use it, otherwise + // default to the one provided in this call. if p.namespace_loader().is_none() { p = p.with_namespace_loader(namespace_name); } if p.table_loader().is_none() { p = p.with_table_loader(table); } + if p.partition_counter().is_none() { + p = p.with_partition_counter(partition_counter); + } Arc::new(Mutex::new(p.build())) } diff --git a/ingester/src/buffer_tree/partition/resolver/old_filter.rs b/ingester/src/buffer_tree/partition/resolver/old_filter.rs index 2276fd8a34..0c7ce9d0b4 100644 --- a/ingester/src/buffer_tree/partition/resolver/old_filter.rs +++ b/ingester/src/buffer_tree/partition/resolver/old_filter.rs @@ -14,7 +14,9 @@ use super::PartitionProvider; use crate::{ buffer_tree::{ namespace::NamespaceName, - partition::{resolver::SortKeyResolver, PartitionData, SortKeyState}, + partition::{ + counter::PartitionCounter, resolver::SortKeyResolver, PartitionData, SortKeyState, + }, table::metadata::TableMetadata, }, deferred_load::DeferredLoad, @@ -148,6 +150,7 @@ where namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { let hash_id = PartitionHashId::new(table_id, &partition_key); @@ -189,6 +192,7 @@ where table_id, table, SortKeyState::Deferred(Arc::new(sort_key_resolver)), + partition_counter, ))); } @@ -202,14 +206,21 @@ where // This partition MAY be an old-style / row-ID-addressed partition // that needs querying for. self.inner - .get_partition(partition_key, namespace_id, namespace_name, table_id, table) + .get_partition( + partition_key, + namespace_id, + namespace_name, + table_id, + table, + partition_counter, + ) .await } } #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{num::NonZeroUsize, sync::Arc}; use data_types::PartitionId; use hashbrown::HashMap; @@ -248,6 +259,7 @@ mod tests { _namespace_name: Arc>, table_id: TableId, _table: Arc>, + _partition_counter: Arc, ) -> Arc> { let mut builder = PartitionDataBuilder::default(); @@ -366,7 +378,8 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), p.table_id, - defer_table_metadata_1_sec() + defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), )); let got_id = got.lock().partition_id().clone(); @@ -417,6 +430,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; diff --git a/ingester/src/buffer_tree/partition/resolver/trait.rs b/ingester/src/buffer_tree/partition/resolver/trait.rs index 7ba0e8ee1d..8ff58003e4 100644 --- a/ingester/src/buffer_tree/partition/resolver/trait.rs +++ b/ingester/src/buffer_tree/partition/resolver/trait.rs @@ -6,7 +6,9 @@ use parking_lot::Mutex; use crate::{ buffer_tree::{ - namespace::NamespaceName, partition::PartitionData, table::metadata::TableMetadata, + namespace::NamespaceName, + partition::{counter::PartitionCounter, PartitionData}, + table::metadata::TableMetadata, }, deferred_load::DeferredLoad, }; @@ -27,6 +29,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug { namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc>; } @@ -42,16 +45,24 @@ where namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { (**self) - .get_partition(partition_key, namespace_id, namespace_name, table_id, table) + .get_partition( + partition_key, + namespace_id, + namespace_name, + table_id, + table, + partition_counter, + ) .await } } #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{num::NonZeroUsize, sync::Arc}; use super::*; use crate::{ @@ -81,6 +92,7 @@ mod tests { Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, Arc::clone(&table_loader), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; assert_eq!( diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index 253cb2a168..e24c3aa654 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -277,6 +277,7 @@ mod tests { table::{metadata::TableMetadata, metadata_resolver::mock::MockTableProvider}, }, deferred_load::{self, DeferredLoad}, + persist::{drain_buffer::persist_partitions, queue::mock::MockPersistQueue}, query::partition_response::PartitionResponse, test_util::{ defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, @@ -821,24 +822,27 @@ mod tests { async fn test_write_query_partition_limit_enforced() { maybe_start_logging(); + let other_table_id = TableId::new(ARBITRARY_TABLE_ID.get() + 1); + let partition_provider = Arc::new( MockPartitionProvider::default() .with_partition( PartitionDataBuilder::new().with_partition_key(ARBITRARY_PARTITION_KEY.clone()), ) .with_partition( - PartitionDataBuilder::new().with_partition_key(PARTITION2_KEY.clone()), + PartitionDataBuilder::new() + .with_partition_key(PARTITION2_KEY.clone()) + .with_table_id(other_table_id), ), ); let table_provider = Arc::clone(&*ARBITRARY_TABLE_PROVIDER); - let partition_count_limit = 1; let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), table_provider, partition_provider, - NonZeroUsize::new(partition_count_limit).unwrap(), + NonZeroUsize::new(1).unwrap(), Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), ); @@ -861,23 +865,69 @@ mod tests { .expect("failed to perform first write"); // Second write to a second table should hit the limit and be rejected - let err = buf - .apply(IngestOp::Write(make_write_op( - &PARTITION2_KEY, - ARBITRARY_NAMESPACE_ID, - "BANANASareDIFFERENT", - TableId::new(ARBITRARY_TABLE_ID.get() + 1), - 0, - "BANANASareDIFFERENT,region=Asturias temp=35 4242424242", - None, - ))) - .await - .expect_err("limit should be enforced"); + { + let err = buf + .apply(IngestOp::Write(make_write_op( + &PARTITION2_KEY, + ARBITRARY_NAMESPACE_ID, + "BANANASareDIFFERENT", + other_table_id, + 0, + "BANANASareDIFFERENT,region=Asturias temp=35 4242424242", + None, + ))) + .await + .expect_err("limit should be enforced"); - assert_matches!(err, BufferWriteError::PartitionLimit { count: 1 }); + assert_matches!(err, BufferWriteError::PartitionLimit { count: 1 }); + } - // Only one partition should exist (second was rejected) + // Only one partition should exist (second was rejected and not created) assert_eq!(buf.partitions().count(), 1); + + // Persisting all the data in the buffered partition should allow the + // second partition to be created and wrote to. + persist_partitions(buf.partitions(), &Arc::new(MockPersistQueue::default())).await; + + // The second write for the second partition must now be accepted. + // Second write to a second table should hit the limit and be rejected + buf.apply(IngestOp::Write(make_write_op( + &PARTITION2_KEY, + ARBITRARY_NAMESPACE_ID, + "BANANASareDIFFERENT", + other_table_id, + 0, + "BANANASareDIFFERENT,region=Asturias temp=35 4242424242", + None, + ))) + .await + .expect("failed to perform write after persist"); + + // There should now be 2 partitions. + assert_eq!(buf.partitions().count(), 2); + + // And writing to the first, existing partition should fail, as the + // non-empty partition limit has been reached. + { + let err = buf + .apply(IngestOp::Write(make_write_op( + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 0, + format!( + r#"{},region=Asturias temp=35 4242424242"#, + &*ARBITRARY_TABLE_NAME + ) + .as_str(), + None, + ))) + .await + .expect_err("limit should be enforced"); + + assert_matches!(err, BufferWriteError::PartitionLimit { count: 1 }); + } } /// Ensure partition pruning during query execution also prunes metadata diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 4be49b8aa0..062d9b6f0f 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -39,8 +39,6 @@ use crate::{ query_adaptor::QueryAdaptor, }; -const MAX_NAMESPACE_PARTITION_COUNT: usize = usize::MAX; - /// Data of a Table in a given Namespace #[derive(Debug)] pub(crate) struct TableData { @@ -58,13 +56,11 @@ pub(crate) struct TableData { // Map of partition key to its data partition_data: ArcMap>, - /// A counter tracking the approximate number of partitions currently - /// buffered. + /// A counter tracking the number of non-empty partitions currently + /// buffered for the parent namespace. /// - /// This counter is NOT atomically incremented w.r.t creation of the - /// partitions it tracks, and therefore is susceptible to "overrun", - /// breaching the configured partition count limit by a relatively small - /// degree. + /// This counter is eventually consistent / relaxed when read, but strongly + /// consistent when enforced. partition_count: Arc, post_write_observer: Arc, @@ -145,16 +141,15 @@ where let p = self.partition_data.get(&partition_key); let partition_data = match p { Some(p) => p, - None if self.partition_count.is_maxed() => { - // This namespace has exceeded the upper bound on partitions. - // - // This counter is approximate, but monotonic - the count may be - // over the desired limit. - return Err(BufferWriteError::PartitionLimit { - count: self.partition_count.read(), - }); - } None => { + // This namespace has may have exceeded the upper bound on + // non-empty partitions. + // + // As an optimisation, check if the partition will be writeable + // before creating it, to avoid creating many + // impossible-to-write-to partitions. + self.partition_count.is_maxed()?; + let p = self .partition_provider .get_partition( @@ -163,16 +158,15 @@ where Arc::clone(&self.namespace_name), self.table_id, Arc::clone(&self.catalog_table), + Arc::clone(&self.partition_count), ) .await; + // Add the partition to the map. // // This MAY return a different instance than `p` if another // thread has already initialised the partition. - self.partition_data.get_or_insert_with(&partition_key, || { - self.partition_count.inc(); - p - }) + self.partition_data.get_or_insert_with(&partition_key, || p) } }; @@ -405,12 +399,12 @@ mod tests { #[tokio::test] async fn test_partition_init() { + let partition_counter = Arc::new(PartitionCounter::new(NonZeroUsize::new(42).unwrap())); + // Configure the mock partition provider to return a partition for a table ID. let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(PartitionDataBuilder::new())); - let partition_counter = Arc::new(PartitionCounter::new(NonZeroUsize::new(42).unwrap())); - let table = TableData::new( ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), @@ -445,7 +439,8 @@ mod tests { // Referencing the partition should succeed assert!(table.partition_data.get(&ARBITRARY_PARTITION_KEY).is_some()); - // The partition should have been recorded in the partition count. + // The partition should have been recorded in the "non-empty" partition + // count. assert_eq!(partition_counter.read(), 1); } diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 5707aec1d5..7c4b570c41 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{collections::BTreeMap, num::NonZeroUsize, sync::Arc, time::Duration}; use data_types::{ partition_template::TablePartitionTemplateOverride, ColumnId, ColumnSet, NamespaceId, @@ -18,7 +18,7 @@ use crate::{ name_resolver::{mock::MockNamespaceNameProvider, NamespaceNameProvider}, NamespaceName, }, - partition::{PartitionData, SortKeyState}, + partition::{counter::PartitionCounter, PartitionData, SortKeyState}, table::{ metadata::{TableMetadata, TableName}, metadata_resolver::{mock::MockTableProvider, TableProvider}, @@ -92,6 +92,7 @@ pub(crate) struct PartitionDataBuilder { table_loader: Option>>, namespace_loader: Option>>, sort_key: Option, + partition_counter: Option>, } impl Default for PartitionDataBuilder { @@ -99,11 +100,12 @@ impl Default for PartitionDataBuilder { Self { table_id: ARBITRARY_TABLE_ID, partition_key: ARBITRARY_PARTITION_KEY.clone(), - partition_id: Default::default(), - namespace_id: Default::default(), - table_loader: Default::default(), - namespace_loader: Default::default(), - sort_key: Default::default(), + partition_id: None, + namespace_id: None, + table_loader: None, + namespace_loader: None, + sort_key: None, + partition_counter: None, } } } @@ -170,6 +172,18 @@ impl PartitionDataBuilder { self } + pub(crate) fn with_partition_counter( + mut self, + partition_counter: Arc, + ) -> Self { + self.partition_counter = Some(partition_counter); + self + } + + pub(crate) fn partition_counter(&self) -> Option> { + self.partition_counter.clone() + } + /// Generate a valid [`PartitionData`] for use in tests where the exact values (or at least /// some of them) don't particularly matter. pub(crate) fn build(self) -> PartitionData { @@ -189,6 +203,8 @@ impl PartitionDataBuilder { self.table_id, self.table_loader.unwrap_or_else(defer_table_metadata_1_sec), self.sort_key.unwrap_or(SortKeyState::Provided(None, None)), + self.partition_counter + .unwrap_or_else(|| Arc::new(PartitionCounter::new(NonZeroUsize::MAX))), ) } }