From 7ad26e6d0eafa087a06ca442bbd10a6bf447eb8e Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 14 Sep 2023 14:33:26 +0200 Subject: [PATCH] feat: only limit non-empty partitions This changes the per-namespace buffered partition limiter to only consider non-empty partitions when enforcing the partition limit. Non-empty partitions cost a small amount of RAM, but are not added to the persist queue - only non-empty partitions will need persisting, so the limiter only needs to limit non-empty partitions. This commit also significantly improves the consistency properties of the limiter - the limit no longer suffers from a small window of "overrun" due to non-atomic updates w.r.t partition creation - the limit is now exact. As an optimisation, partitions are not created at all if the limit has been reached, preventing an accumulation of empty partitions whilst the limit is being enforced. --- ingester/src/buffer_tree/namespace.rs | 10 +- ingester/src/buffer_tree/partition.rs | 91 ++++++++++++++++++- ingester/src/buffer_tree/partition/counter.rs | 74 ++++++++++++--- .../buffer_tree/partition/resolver/cache.rs | 21 ++++- .../buffer_tree/partition/resolver/catalog.rs | 8 +- .../partition/resolver/coalesce.rs | 24 ++++- .../buffer_tree/partition/resolver/mock.rs | 12 ++- .../partition/resolver/old_filter.rs | 22 ++++- .../buffer_tree/partition/resolver/trait.rs | 18 +++- ingester/src/buffer_tree/root.rs | 84 +++++++++++++---- ingester/src/buffer_tree/table.rs | 43 ++++----- ingester/src/test_util.rs | 30 ++++-- 12 files changed, 350 insertions(+), 87 deletions(-) diff --git a/ingester/src/buffer_tree/namespace.rs b/ingester/src/buffer_tree/namespace.rs index 524da841d9..7c07a1b408 100644 --- a/ingester/src/buffer_tree/namespace.rs +++ b/ingester/src/buffer_tree/namespace.rs @@ -81,13 +81,11 @@ pub(crate) struct NamespaceData { /// [`PartitionData`]: super::partition::PartitionData partition_provider: Arc, - /// A counter tracking the approximate number of partitions currently - /// buffered. + /// A counter tracking the number of non-empty partitions currently + /// buffered for this namespace. /// - /// This counter is NOT atomically incremented w.r.t creation of the - /// partitions it tracks, and therefore is susceptible to "overrun", - /// breaching the configured partition count limit by a relatively small - /// degree. + /// This counter is eventually consistent / relaxed when read, but strongly + /// consistent when enforced. partition_count: Arc, post_write_observer: Arc, diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index 70c953f92f..a265a107fe 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -12,10 +12,11 @@ use schema::{merge::SchemaMerger, sort::SortKey, Schema}; use self::{ buffer::{traits::Queryable, DataBuffer}, + counter::PartitionCounter, persisting::{BatchIdent, PersistingData}, persisting_list::PersistingList, }; -use super::{namespace::NamespaceName, table::metadata::TableMetadata}; +use super::{namespace::NamespaceName, table::metadata::TableMetadata, BufferWriteError}; use crate::{ deferred_load::DeferredLoad, query::projection::OwnedProjection, query_adaptor::QueryAdaptor, }; @@ -93,6 +94,13 @@ pub struct PartitionData { /// / deferred. table: Arc>, + /// An optimised empty indicator that returns true when the `buffer` and + /// `persisting` list are empty: a subsequent query will not return any + /// rows. + /// + /// False when this partition contains data. + is_empty: bool, + /// A [`DataBuffer`] for incoming writes. buffer: DataBuffer, @@ -112,6 +120,15 @@ pub struct PartitionData { /// The number of persist operations completed over the lifetime of this /// [`PartitionData`]. completed_persistence_count: u64, + + /// A counter tracking the number of non-empty partitions per namespace. + /// + /// This value is incremented when this [`PartitionData`] transitions from + /// empty, to non-empty during write buffering. Likewise the value is + /// decremented when persistence is marked as complete and the persisted + /// data is dropped, transitioning the [`PartitionData`] from non-empty to + /// empty. + partition_counter: Arc, } impl PartitionData { @@ -125,6 +142,7 @@ impl PartitionData { table_id: TableId, table: Arc>, sort_key: SortKeyState, + partition_counter: Arc, ) -> Self { Self { partition_id, @@ -138,6 +156,8 @@ impl PartitionData { persisting: PersistingList::default(), started_persistence_count: BatchIdent::default(), completed_persistence_count: 0, + partition_counter, + is_empty: true, } } @@ -146,10 +166,28 @@ impl PartitionData { &mut self, mb: MutableBatch, sequence_number: SequenceNumber, - ) -> Result<(), mutable_batch::Error> { + ) -> Result<(), BufferWriteError> { + if self.is_empty() { + // This partition is transitioning from empty, to non-empty. + // + // Because non-empty partitions are bounded per namespace, a check + // must be made to ensure accepting this write does not exceed the + // (approximate) limit. + self.partition_counter.inc()?; + self.is_empty = false; + } + + // Invariant: the non-empty partition counter is always >0 at this + // point because this partition is non-empty. + debug_assert_ne!(self.partition_counter.read(), 0); + // Buffer the write. self.buffer.buffer_write(mb, sequence_number)?; + // Invariant: if the partition contains a buffered write, it must report + // non-empty. + debug_assert!(!self.is_empty()); + trace!( namespace_id = %self.namespace_id, table_id = %self.table_id, @@ -187,7 +225,16 @@ impl PartitionData { /// Returns true if this partition contains no data (buffered or /// persisting). pub(crate) fn is_empty(&self) -> bool { - self.persisting.is_empty() && self.buffer.rows() == 0 + // Assert the optimised bool is in-sync with the actual state. + debug_assert_eq!( + self.persisting.is_empty() && self.buffer.rows() == 0, + self.is_empty + ); + + // Optimisation: instead of inspecting the persisting list, and + // inspecting the row count of the data buffer, the empty state can be + // correctly and cheaply tracked separately. + self.is_empty } /// Return the timestamp min/max values for the data contained within this @@ -271,6 +318,13 @@ impl PartitionData { return None; } + // Invariant: the number of rows is always > 0 at this point. + debug_assert_ne!(self.rows(), 0); + + // Invariant: the non-empty partition counter is always >0 at this + // point because this partition is non-empty. + debug_assert_ne!(self.partition_counter.read(), 0); + // Construct the query adaptor over the partition data. // // `data` MUST contain at least one row, or the constructor panics. This @@ -322,6 +376,10 @@ impl PartitionData { // From this point on, all code MUST be infallible or the buffered data // contained within persisting may be dropped. + // Invariant: the non-empty partition counter is always >0 at this + // point because this partition is non-empty. + assert!(self.partition_counter.read() > 0); + // Increment the "started persist" counter. // // This is used to cheaply identify batches given to the @@ -351,6 +409,13 @@ impl PartitionData { // order). self.persisting.push(batch_ident, fsm); + // Invariant: the partition must not be marked as empty when there's an + // entry in the persisting list. + debug_assert!(!self.is_empty()); + + // Invariant: the number of rows is always > 0 at this point. + debug_assert_ne!(self.rows(), 0); + Some(data) } @@ -364,6 +429,10 @@ impl PartitionData { /// This method panics if [`Self`] is not marked as undergoing a persist /// operation, or `batch` is not currently being persisted. pub(crate) fn mark_persisted(&mut self, batch: PersistingData) -> SequenceNumberSet { + // Invariant: the partition must not be marked as empty when marking + // persisting data as complete. + debug_assert!(!self.is_empty()); + let fsm = self.persisting.remove(batch.batch_ident()); self.completed_persistence_count += 1; @@ -379,6 +448,22 @@ impl PartitionData { "marking partition persistence complete" ); + // Invariant: the non-empty partition counter is always >0 at this + // point because this partition is non-empty. + assert!(self.partition_counter.read() > 0); + + // If this partition is now empty, the buffered partition counter should + // be decremented to return the permit to the per-namespace pool. + // + // Check the actual partition data content (rather than relying on + // is_empty bool) as the list was modified above and the bool state may + // need updating. + if self.persisting.is_empty() && self.buffer.rows() == 0 { + // This partitioning from non-empty to empty. + self.partition_counter.dec(); + self.is_empty = true; + } + // Return the set of IDs this buffer contained. fsm.into_sequence_number_set() } diff --git a/ingester/src/buffer_tree/partition/counter.rs b/ingester/src/buffer_tree/partition/counter.rs index 4efa09e6a8..1870f8bb91 100644 --- a/ingester/src/buffer_tree/partition/counter.rs +++ b/ingester/src/buffer_tree/partition/counter.rs @@ -3,10 +3,14 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; +use crate::buffer_tree::BufferWriteError; + /// A counter typed for counting partitions and applying a configured limit. /// -/// No ordering is guaranteed - increments and reads may be arbitrarily -/// reordered w.r.t other memory operations (relaxed ordering). +/// This value is completely unsynchronised (in the context of memory ordering) +/// and relies on external thread synchronisation. Calls to change the value of +/// this counter may be arbitrarily reordered with other memory operations - the +/// caller is responsible for providing external ordering where necessary. #[derive(Debug)] pub(crate) struct PartitionCounter { current: AtomicUsize, @@ -22,22 +26,51 @@ impl PartitionCounter { } /// Increment the counter by 1. - pub(crate) fn inc(&self) { - self.current.fetch_add(1, Ordering::Relaxed); + /// + /// Return [`Err`] if the configured limit has been reached, else returns + /// [`Ok`] if the current value is below the limit. + pub(crate) fn inc(&self) -> Result<(), BufferWriteError> { + if self.current.fetch_add(1, Ordering::Relaxed) >= self.max { + // The limit was exceeded, roll back the addition and return an + // error. + let count = self.current.fetch_sub(1, Ordering::Relaxed); + return Err(BufferWriteError::PartitionLimit { count: count - 1 }); + } + + Ok(()) + } + + /// Decrement the counter by 1. + /// + /// # Panics + /// + /// This method MAY panic if the counter wraps around - never decrement more + /// than previously incremented. + pub(crate) fn dec(&self) { + let v = self.current.fetch_sub(1, Ordering::Relaxed); + + // Correctness: the fetch operations are RMW, which are guaranteed to + // see a monotonic history, therefore any prior fetch_add always happens + // before the fetch_sub above. + debug_assert_ne!(v, usize::MAX); // MUST never wraparound wraparound + } + + /// Returns an error if the limit has been reached and a subsequent + /// increment would fail. + pub(crate) fn is_maxed(&self) -> Result<(), BufferWriteError> { + let count = self.read(); + if count >= self.max { + return Err(BufferWriteError::PartitionLimit { count }); + } + + Ok(()) } /// Read the approximate counter value. - /// - /// Reads may return stale values, but will always be monotonic. pub(crate) fn read(&self) -> usize { self.current.load(Ordering::Relaxed) } - /// Return `true` if the configured limit has been reached. - pub(crate) fn is_maxed(&self) -> bool { - self.read() >= self.max - } - #[cfg(test)] pub(crate) fn set(&self, v: usize) { self.current.store(v, Ordering::Relaxed) @@ -48,6 +81,8 @@ impl PartitionCounter { mod tests { use super::*; + use assert_matches::assert_matches; + #[test] fn test_counter() { const N: usize = 100; @@ -55,12 +90,23 @@ mod tests { let c = PartitionCounter::new(NonZeroUsize::new(N).unwrap()); for v in 0..N { - assert!(!c.is_maxed()); assert_eq!(c.read(), v); - c.inc(); + assert_matches!(c.is_maxed(), Ok(())); + assert!(c.inc().is_ok()); } assert_eq!(c.read(), N); - assert!(c.is_maxed()); + assert_matches!( + c.is_maxed(), + Err(BufferWriteError::PartitionLimit { count: N }) + ); + assert_matches!(c.inc(), Err(BufferWriteError::PartitionLimit { count: N })); + assert_eq!(c.read(), N); // Counter value unchanged. + + c.dec(); + assert_eq!(c.read(), N - 1); + assert_matches!(c.is_maxed(), Ok(())); + assert_matches!(c.inc(), Ok(())); + assert_eq!(c.read(), N); } } diff --git a/ingester/src/buffer_tree/partition/resolver/cache.rs b/ingester/src/buffer_tree/partition/resolver/cache.rs index c7e47b9878..395f0355a5 100644 --- a/ingester/src/buffer_tree/partition/resolver/cache.rs +++ b/ingester/src/buffer_tree/partition/resolver/cache.rs @@ -11,7 +11,9 @@ use super::r#trait::PartitionProvider; use crate::{ buffer_tree::{ namespace::NamespaceName, - partition::{resolver::SortKeyResolver, PartitionData, SortKeyState}, + partition::{ + counter::PartitionCounter, resolver::SortKeyResolver, PartitionData, SortKeyState, + }, table::metadata::TableMetadata, }, deferred_load::DeferredLoad, @@ -168,6 +170,7 @@ where namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { // Use the cached PartitionKey instead of the caller's partition_key, // instead preferring to reuse the already-shared Arc in the cache. @@ -199,6 +202,7 @@ where table_id, table, SortKeyState::Deferred(Arc::new(sort_key_resolver)), + partition_counter, ))); } @@ -206,7 +210,14 @@ where // Otherwise delegate to the catalog / inner impl. self.inner - .get_partition(partition_key, namespace_id, namespace_name, table_id, table) + .get_partition( + partition_key, + namespace_id, + namespace_name, + table_id, + table, + partition_counter, + ) .await } } @@ -216,6 +227,8 @@ mod tests { // Harmless in tests - saves a bunch of extra vars. #![allow(clippy::await_holding_lock)] + use std::num::NonZeroUsize; + use data_types::SortedColumnSet; use iox_catalog::mem::MemCatalog; @@ -259,6 +272,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; @@ -302,6 +316,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; @@ -359,6 +374,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; @@ -394,6 +410,7 @@ mod tests { defer_namespace_name_1_sec(), other_table, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; diff --git a/ingester/src/buffer_tree/partition/resolver/catalog.rs b/ingester/src/buffer_tree/partition/resolver/catalog.rs index a540766c2b..facdf5b7a2 100644 --- a/ingester/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester/src/buffer_tree/partition/resolver/catalog.rs @@ -15,7 +15,8 @@ use crate::{ buffer_tree::{ namespace::NamespaceName, partition::{ - resolver::build_sort_key_from_sort_key_ids_and_columns, PartitionData, SortKeyState, + counter::PartitionCounter, resolver::build_sort_key_from_sort_key_ids_and_columns, + PartitionData, SortKeyState, }, table::metadata::TableMetadata, }, @@ -76,6 +77,7 @@ impl PartitionProvider for CatalogPartitionResolver { namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { debug!( %partition_key, @@ -119,6 +121,7 @@ impl PartitionProvider for CatalogPartitionResolver { table_id, table, SortKeyState::Provided(sort_key, p_sort_key_ids), + partition_counter, ))) } } @@ -128,7 +131,7 @@ mod tests { // Harmless in tests - saves a bunch of extra vars. #![allow(clippy::await_holding_lock)] - use std::{sync::Arc, time::Duration}; + use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use assert_matches::assert_matches; use iox_catalog::{ @@ -181,6 +184,7 @@ mod tests { }, &metrics, )), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; diff --git a/ingester/src/buffer_tree/partition/resolver/coalesce.rs b/ingester/src/buffer_tree/partition/resolver/coalesce.rs index e08c8d31c0..2b170b2e36 100644 --- a/ingester/src/buffer_tree/partition/resolver/coalesce.rs +++ b/ingester/src/buffer_tree/partition/resolver/coalesce.rs @@ -14,7 +14,9 @@ use parking_lot::Mutex; use crate::{ buffer_tree::{ - namespace::NamespaceName, partition::PartitionData, table::metadata::TableMetadata, + namespace::NamespaceName, + partition::{counter::PartitionCounter, PartitionData}, + table::metadata::TableMetadata, }, deferred_load::DeferredLoad, }; @@ -147,6 +149,7 @@ where namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { let key = Key { table_id, @@ -170,6 +173,7 @@ where namespace_name, table_id, table, + partition_counter, )); // Make the future poll-able by many callers, all of which @@ -233,6 +237,7 @@ async fn do_fetch( namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> where T: PartitionProvider + 'static, @@ -247,7 +252,14 @@ where // (which would cause the connection to be returned). tokio::spawn(async move { inner - .get_partition(partition_key, namespace_id, namespace_name, table_id, table) + .get_partition( + partition_key, + namespace_id, + namespace_name, + table_id, + table, + partition_counter, + ) .await }) .await @@ -258,6 +270,7 @@ where mod tests { use std::{ future, + num::NonZeroUsize, sync::Arc, task::{Context, Poll}, time::Duration, @@ -300,6 +313,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) }) .collect::>() @@ -334,6 +348,7 @@ mod tests { _namespace_name: Arc>, _table_id: TableId, _table: Arc>, + _partition_counter: Arc, ) -> core::pin::Pin< Box< dyn core::future::Future>> @@ -376,6 +391,7 @@ mod tests { Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, Arc::clone(&table_loader), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ); let pa_2 = layer.get_partition( ARBITRARY_PARTITION_KEY.clone(), @@ -383,6 +399,7 @@ mod tests { Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, Arc::clone(&table_loader), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ); let waker = futures::task::noop_waker(); @@ -403,6 +420,7 @@ mod tests { namespace_loader, ARBITRARY_TABLE_ID, table_loader, + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .with_timeout_panic(Duration::from_secs(5)) .await; @@ -433,6 +451,7 @@ mod tests { _namespace_name: Arc>, _table_id: TableId, _table: Arc>, + _partition_counter: Arc, ) -> Arc> { let waker = self.wait.notified(); let permit = self.sem.acquire().await.unwrap(); @@ -473,6 +492,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ); let waker = futures::task::noop_waker(); diff --git a/ingester/src/buffer_tree/partition/resolver/mock.rs b/ingester/src/buffer_tree/partition/resolver/mock.rs index f40da05a81..0fd2679353 100644 --- a/ingester/src/buffer_tree/partition/resolver/mock.rs +++ b/ingester/src/buffer_tree/partition/resolver/mock.rs @@ -9,7 +9,9 @@ use parking_lot::Mutex; use super::r#trait::PartitionProvider; use crate::{ buffer_tree::{ - namespace::NamespaceName, partition::PartitionData, table::metadata::TableMetadata, + namespace::NamespaceName, + partition::{counter::PartitionCounter, PartitionData}, + table::metadata::TableMetadata, }, deferred_load::DeferredLoad, test_util::PartitionDataBuilder, @@ -57,6 +59,7 @@ impl PartitionProvider for MockPartitionProvider { namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { let p = self .partitions @@ -68,14 +71,17 @@ impl PartitionProvider for MockPartitionProvider { let mut p = p.with_namespace_id(namespace_id); - // If the test provided a namespace/table loader, use it, otherwise default to the - // one provided in this call. + // If the test provided a namespace/table loader, use it, otherwise + // default to the one provided in this call. if p.namespace_loader().is_none() { p = p.with_namespace_loader(namespace_name); } if p.table_loader().is_none() { p = p.with_table_loader(table); } + if p.partition_counter().is_none() { + p = p.with_partition_counter(partition_counter); + } Arc::new(Mutex::new(p.build())) } diff --git a/ingester/src/buffer_tree/partition/resolver/old_filter.rs b/ingester/src/buffer_tree/partition/resolver/old_filter.rs index 2276fd8a34..0c7ce9d0b4 100644 --- a/ingester/src/buffer_tree/partition/resolver/old_filter.rs +++ b/ingester/src/buffer_tree/partition/resolver/old_filter.rs @@ -14,7 +14,9 @@ use super::PartitionProvider; use crate::{ buffer_tree::{ namespace::NamespaceName, - partition::{resolver::SortKeyResolver, PartitionData, SortKeyState}, + partition::{ + counter::PartitionCounter, resolver::SortKeyResolver, PartitionData, SortKeyState, + }, table::metadata::TableMetadata, }, deferred_load::DeferredLoad, @@ -148,6 +150,7 @@ where namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { let hash_id = PartitionHashId::new(table_id, &partition_key); @@ -189,6 +192,7 @@ where table_id, table, SortKeyState::Deferred(Arc::new(sort_key_resolver)), + partition_counter, ))); } @@ -202,14 +206,21 @@ where // This partition MAY be an old-style / row-ID-addressed partition // that needs querying for. self.inner - .get_partition(partition_key, namespace_id, namespace_name, table_id, table) + .get_partition( + partition_key, + namespace_id, + namespace_name, + table_id, + table, + partition_counter, + ) .await } } #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{num::NonZeroUsize, sync::Arc}; use data_types::PartitionId; use hashbrown::HashMap; @@ -248,6 +259,7 @@ mod tests { _namespace_name: Arc>, table_id: TableId, _table: Arc>, + _partition_counter: Arc, ) -> Arc> { let mut builder = PartitionDataBuilder::default(); @@ -366,7 +378,8 @@ mod tests { ARBITRARY_NAMESPACE_ID, defer_namespace_name_1_sec(), p.table_id, - defer_table_metadata_1_sec() + defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), )); let got_id = got.lock().partition_id().clone(); @@ -417,6 +430,7 @@ mod tests { defer_namespace_name_1_sec(), ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; diff --git a/ingester/src/buffer_tree/partition/resolver/trait.rs b/ingester/src/buffer_tree/partition/resolver/trait.rs index 7ba0e8ee1d..8ff58003e4 100644 --- a/ingester/src/buffer_tree/partition/resolver/trait.rs +++ b/ingester/src/buffer_tree/partition/resolver/trait.rs @@ -6,7 +6,9 @@ use parking_lot::Mutex; use crate::{ buffer_tree::{ - namespace::NamespaceName, partition::PartitionData, table::metadata::TableMetadata, + namespace::NamespaceName, + partition::{counter::PartitionCounter, PartitionData}, + table::metadata::TableMetadata, }, deferred_load::DeferredLoad, }; @@ -27,6 +29,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug { namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc>; } @@ -42,16 +45,24 @@ where namespace_name: Arc>, table_id: TableId, table: Arc>, + partition_counter: Arc, ) -> Arc> { (**self) - .get_partition(partition_key, namespace_id, namespace_name, table_id, table) + .get_partition( + partition_key, + namespace_id, + namespace_name, + table_id, + table, + partition_counter, + ) .await } } #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{num::NonZeroUsize, sync::Arc}; use super::*; use crate::{ @@ -81,6 +92,7 @@ mod tests { Arc::clone(&namespace_loader), ARBITRARY_TABLE_ID, Arc::clone(&table_loader), + Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())), ) .await; assert_eq!( diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index 253cb2a168..e24c3aa654 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -277,6 +277,7 @@ mod tests { table::{metadata::TableMetadata, metadata_resolver::mock::MockTableProvider}, }, deferred_load::{self, DeferredLoad}, + persist::{drain_buffer::persist_partitions, queue::mock::MockPersistQueue}, query::partition_response::PartitionResponse, test_util::{ defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder, @@ -821,24 +822,27 @@ mod tests { async fn test_write_query_partition_limit_enforced() { maybe_start_logging(); + let other_table_id = TableId::new(ARBITRARY_TABLE_ID.get() + 1); + let partition_provider = Arc::new( MockPartitionProvider::default() .with_partition( PartitionDataBuilder::new().with_partition_key(ARBITRARY_PARTITION_KEY.clone()), ) .with_partition( - PartitionDataBuilder::new().with_partition_key(PARTITION2_KEY.clone()), + PartitionDataBuilder::new() + .with_partition_key(PARTITION2_KEY.clone()) + .with_table_id(other_table_id), ), ); let table_provider = Arc::clone(&*ARBITRARY_TABLE_PROVIDER); - let partition_count_limit = 1; let buf = BufferTree::new( Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)), table_provider, partition_provider, - NonZeroUsize::new(partition_count_limit).unwrap(), + NonZeroUsize::new(1).unwrap(), Arc::new(MockPostWriteObserver::default()), Arc::new(metric::Registry::default()), ); @@ -861,23 +865,69 @@ mod tests { .expect("failed to perform first write"); // Second write to a second table should hit the limit and be rejected - let err = buf - .apply(IngestOp::Write(make_write_op( - &PARTITION2_KEY, - ARBITRARY_NAMESPACE_ID, - "BANANASareDIFFERENT", - TableId::new(ARBITRARY_TABLE_ID.get() + 1), - 0, - "BANANASareDIFFERENT,region=Asturias temp=35 4242424242", - None, - ))) - .await - .expect_err("limit should be enforced"); + { + let err = buf + .apply(IngestOp::Write(make_write_op( + &PARTITION2_KEY, + ARBITRARY_NAMESPACE_ID, + "BANANASareDIFFERENT", + other_table_id, + 0, + "BANANASareDIFFERENT,region=Asturias temp=35 4242424242", + None, + ))) + .await + .expect_err("limit should be enforced"); - assert_matches!(err, BufferWriteError::PartitionLimit { count: 1 }); + assert_matches!(err, BufferWriteError::PartitionLimit { count: 1 }); + } - // Only one partition should exist (second was rejected) + // Only one partition should exist (second was rejected and not created) assert_eq!(buf.partitions().count(), 1); + + // Persisting all the data in the buffered partition should allow the + // second partition to be created and wrote to. + persist_partitions(buf.partitions(), &Arc::new(MockPersistQueue::default())).await; + + // The second write for the second partition must now be accepted. + // Second write to a second table should hit the limit and be rejected + buf.apply(IngestOp::Write(make_write_op( + &PARTITION2_KEY, + ARBITRARY_NAMESPACE_ID, + "BANANASareDIFFERENT", + other_table_id, + 0, + "BANANASareDIFFERENT,region=Asturias temp=35 4242424242", + None, + ))) + .await + .expect("failed to perform write after persist"); + + // There should now be 2 partitions. + assert_eq!(buf.partitions().count(), 2); + + // And writing to the first, existing partition should fail, as the + // non-empty partition limit has been reached. + { + let err = buf + .apply(IngestOp::Write(make_write_op( + &ARBITRARY_PARTITION_KEY, + ARBITRARY_NAMESPACE_ID, + &ARBITRARY_TABLE_NAME, + ARBITRARY_TABLE_ID, + 0, + format!( + r#"{},region=Asturias temp=35 4242424242"#, + &*ARBITRARY_TABLE_NAME + ) + .as_str(), + None, + ))) + .await + .expect_err("limit should be enforced"); + + assert_matches!(err, BufferWriteError::PartitionLimit { count: 1 }); + } } /// Ensure partition pruning during query execution also prunes metadata diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 4be49b8aa0..062d9b6f0f 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -39,8 +39,6 @@ use crate::{ query_adaptor::QueryAdaptor, }; -const MAX_NAMESPACE_PARTITION_COUNT: usize = usize::MAX; - /// Data of a Table in a given Namespace #[derive(Debug)] pub(crate) struct TableData { @@ -58,13 +56,11 @@ pub(crate) struct TableData { // Map of partition key to its data partition_data: ArcMap>, - /// A counter tracking the approximate number of partitions currently - /// buffered. + /// A counter tracking the number of non-empty partitions currently + /// buffered for the parent namespace. /// - /// This counter is NOT atomically incremented w.r.t creation of the - /// partitions it tracks, and therefore is susceptible to "overrun", - /// breaching the configured partition count limit by a relatively small - /// degree. + /// This counter is eventually consistent / relaxed when read, but strongly + /// consistent when enforced. partition_count: Arc, post_write_observer: Arc, @@ -145,16 +141,15 @@ where let p = self.partition_data.get(&partition_key); let partition_data = match p { Some(p) => p, - None if self.partition_count.is_maxed() => { - // This namespace has exceeded the upper bound on partitions. - // - // This counter is approximate, but monotonic - the count may be - // over the desired limit. - return Err(BufferWriteError::PartitionLimit { - count: self.partition_count.read(), - }); - } None => { + // This namespace has may have exceeded the upper bound on + // non-empty partitions. + // + // As an optimisation, check if the partition will be writeable + // before creating it, to avoid creating many + // impossible-to-write-to partitions. + self.partition_count.is_maxed()?; + let p = self .partition_provider .get_partition( @@ -163,16 +158,15 @@ where Arc::clone(&self.namespace_name), self.table_id, Arc::clone(&self.catalog_table), + Arc::clone(&self.partition_count), ) .await; + // Add the partition to the map. // // This MAY return a different instance than `p` if another // thread has already initialised the partition. - self.partition_data.get_or_insert_with(&partition_key, || { - self.partition_count.inc(); - p - }) + self.partition_data.get_or_insert_with(&partition_key, || p) } }; @@ -405,12 +399,12 @@ mod tests { #[tokio::test] async fn test_partition_init() { + let partition_counter = Arc::new(PartitionCounter::new(NonZeroUsize::new(42).unwrap())); + // Configure the mock partition provider to return a partition for a table ID. let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(PartitionDataBuilder::new())); - let partition_counter = Arc::new(PartitionCounter::new(NonZeroUsize::new(42).unwrap())); - let table = TableData::new( ARBITRARY_TABLE_ID, defer_table_metadata_1_sec(), @@ -445,7 +439,8 @@ mod tests { // Referencing the partition should succeed assert!(table.partition_data.get(&ARBITRARY_PARTITION_KEY).is_some()); - // The partition should have been recorded in the partition count. + // The partition should have been recorded in the "non-empty" partition + // count. assert_eq!(partition_counter.read(), 1); } diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 5707aec1d5..7c4b570c41 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{collections::BTreeMap, num::NonZeroUsize, sync::Arc, time::Duration}; use data_types::{ partition_template::TablePartitionTemplateOverride, ColumnId, ColumnSet, NamespaceId, @@ -18,7 +18,7 @@ use crate::{ name_resolver::{mock::MockNamespaceNameProvider, NamespaceNameProvider}, NamespaceName, }, - partition::{PartitionData, SortKeyState}, + partition::{counter::PartitionCounter, PartitionData, SortKeyState}, table::{ metadata::{TableMetadata, TableName}, metadata_resolver::{mock::MockTableProvider, TableProvider}, @@ -92,6 +92,7 @@ pub(crate) struct PartitionDataBuilder { table_loader: Option>>, namespace_loader: Option>>, sort_key: Option, + partition_counter: Option>, } impl Default for PartitionDataBuilder { @@ -99,11 +100,12 @@ impl Default for PartitionDataBuilder { Self { table_id: ARBITRARY_TABLE_ID, partition_key: ARBITRARY_PARTITION_KEY.clone(), - partition_id: Default::default(), - namespace_id: Default::default(), - table_loader: Default::default(), - namespace_loader: Default::default(), - sort_key: Default::default(), + partition_id: None, + namespace_id: None, + table_loader: None, + namespace_loader: None, + sort_key: None, + partition_counter: None, } } } @@ -170,6 +172,18 @@ impl PartitionDataBuilder { self } + pub(crate) fn with_partition_counter( + mut self, + partition_counter: Arc, + ) -> Self { + self.partition_counter = Some(partition_counter); + self + } + + pub(crate) fn partition_counter(&self) -> Option> { + self.partition_counter.clone() + } + /// Generate a valid [`PartitionData`] for use in tests where the exact values (or at least /// some of them) don't particularly matter. pub(crate) fn build(self) -> PartitionData { @@ -189,6 +203,8 @@ impl PartitionDataBuilder { self.table_id, self.table_loader.unwrap_or_else(defer_table_metadata_1_sec), self.sort_key.unwrap_or(SortKeyState::Provided(None, None)), + self.partition_counter + .unwrap_or_else(|| Arc::new(PartitionCounter::new(NonZeroUsize::MAX))), ) } }