feat: only limit non-empty partitions

This changes the per-namespace buffered partition limiter to only
consider non-empty partitions when enforcing the partition limit.

Non-empty partitions cost a small amount of RAM, but are not added to
the persist queue - only non-empty partitions will need persisting, so
the limiter only needs to limit non-empty partitions.

This commit also significantly improves the consistency properties of
the limiter - the limit no longer suffers from a small window of
"overrun" due to non-atomic updates w.r.t partition creation - the limit
is now exact.

As an optimisation, partitions are not created at all if the limit has
been reached, preventing an accumulation of empty partitions whilst the
limit is being enforced.
pull/24376/head
Dom Dwyer 2023-09-14 14:33:26 +02:00
parent 3978b07a43
commit 7ad26e6d0e
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
12 changed files with 350 additions and 87 deletions

View File

@ -81,13 +81,11 @@ pub(crate) struct NamespaceData<O> {
/// [`PartitionData`]: super::partition::PartitionData
partition_provider: Arc<dyn PartitionProvider>,
/// A counter tracking the approximate number of partitions currently
/// buffered.
/// A counter tracking the number of non-empty partitions currently
/// buffered for this namespace.
///
/// This counter is NOT atomically incremented w.r.t creation of the
/// partitions it tracks, and therefore is susceptible to "overrun",
/// breaching the configured partition count limit by a relatively small
/// degree.
/// This counter is eventually consistent / relaxed when read, but strongly
/// consistent when enforced.
partition_count: Arc<PartitionCounter>,
post_write_observer: Arc<O>,

View File

@ -12,10 +12,11 @@ use schema::{merge::SchemaMerger, sort::SortKey, Schema};
use self::{
buffer::{traits::Queryable, DataBuffer},
counter::PartitionCounter,
persisting::{BatchIdent, PersistingData},
persisting_list::PersistingList,
};
use super::{namespace::NamespaceName, table::metadata::TableMetadata};
use super::{namespace::NamespaceName, table::metadata::TableMetadata, BufferWriteError};
use crate::{
deferred_load::DeferredLoad, query::projection::OwnedProjection, query_adaptor::QueryAdaptor,
};
@ -93,6 +94,13 @@ pub struct PartitionData {
/// / deferred.
table: Arc<DeferredLoad<TableMetadata>>,
/// An optimised empty indicator that returns true when the `buffer` and
/// `persisting` list are empty: a subsequent query will not return any
/// rows.
///
/// False when this partition contains data.
is_empty: bool,
/// A [`DataBuffer`] for incoming writes.
buffer: DataBuffer,
@ -112,6 +120,15 @@ pub struct PartitionData {
/// The number of persist operations completed over the lifetime of this
/// [`PartitionData`].
completed_persistence_count: u64,
/// A counter tracking the number of non-empty partitions per namespace.
///
/// This value is incremented when this [`PartitionData`] transitions from
/// empty, to non-empty during write buffering. Likewise the value is
/// decremented when persistence is marked as complete and the persisted
/// data is dropped, transitioning the [`PartitionData`] from non-empty to
/// empty.
partition_counter: Arc<PartitionCounter>,
}
impl PartitionData {
@ -125,6 +142,7 @@ impl PartitionData {
table_id: TableId,
table: Arc<DeferredLoad<TableMetadata>>,
sort_key: SortKeyState,
partition_counter: Arc<PartitionCounter>,
) -> Self {
Self {
partition_id,
@ -138,6 +156,8 @@ impl PartitionData {
persisting: PersistingList::default(),
started_persistence_count: BatchIdent::default(),
completed_persistence_count: 0,
partition_counter,
is_empty: true,
}
}
@ -146,10 +166,28 @@ impl PartitionData {
&mut self,
mb: MutableBatch,
sequence_number: SequenceNumber,
) -> Result<(), mutable_batch::Error> {
) -> Result<(), BufferWriteError> {
if self.is_empty() {
// This partition is transitioning from empty, to non-empty.
//
// Because non-empty partitions are bounded per namespace, a check
// must be made to ensure accepting this write does not exceed the
// (approximate) limit.
self.partition_counter.inc()?;
self.is_empty = false;
}
// Invariant: the non-empty partition counter is always >0 at this
// point because this partition is non-empty.
debug_assert_ne!(self.partition_counter.read(), 0);
// Buffer the write.
self.buffer.buffer_write(mb, sequence_number)?;
// Invariant: if the partition contains a buffered write, it must report
// non-empty.
debug_assert!(!self.is_empty());
trace!(
namespace_id = %self.namespace_id,
table_id = %self.table_id,
@ -187,7 +225,16 @@ impl PartitionData {
/// Returns true if this partition contains no data (buffered or
/// persisting).
pub(crate) fn is_empty(&self) -> bool {
self.persisting.is_empty() && self.buffer.rows() == 0
// Assert the optimised bool is in-sync with the actual state.
debug_assert_eq!(
self.persisting.is_empty() && self.buffer.rows() == 0,
self.is_empty
);
// Optimisation: instead of inspecting the persisting list, and
// inspecting the row count of the data buffer, the empty state can be
// correctly and cheaply tracked separately.
self.is_empty
}
/// Return the timestamp min/max values for the data contained within this
@ -271,6 +318,13 @@ impl PartitionData {
return None;
}
// Invariant: the number of rows is always > 0 at this point.
debug_assert_ne!(self.rows(), 0);
// Invariant: the non-empty partition counter is always >0 at this
// point because this partition is non-empty.
debug_assert_ne!(self.partition_counter.read(), 0);
// Construct the query adaptor over the partition data.
//
// `data` MUST contain at least one row, or the constructor panics. This
@ -322,6 +376,10 @@ impl PartitionData {
// From this point on, all code MUST be infallible or the buffered data
// contained within persisting may be dropped.
// Invariant: the non-empty partition counter is always >0 at this
// point because this partition is non-empty.
assert!(self.partition_counter.read() > 0);
// Increment the "started persist" counter.
//
// This is used to cheaply identify batches given to the
@ -351,6 +409,13 @@ impl PartitionData {
// order).
self.persisting.push(batch_ident, fsm);
// Invariant: the partition must not be marked as empty when there's an
// entry in the persisting list.
debug_assert!(!self.is_empty());
// Invariant: the number of rows is always > 0 at this point.
debug_assert_ne!(self.rows(), 0);
Some(data)
}
@ -364,6 +429,10 @@ impl PartitionData {
/// This method panics if [`Self`] is not marked as undergoing a persist
/// operation, or `batch` is not currently being persisted.
pub(crate) fn mark_persisted(&mut self, batch: PersistingData) -> SequenceNumberSet {
// Invariant: the partition must not be marked as empty when marking
// persisting data as complete.
debug_assert!(!self.is_empty());
let fsm = self.persisting.remove(batch.batch_ident());
self.completed_persistence_count += 1;
@ -379,6 +448,22 @@ impl PartitionData {
"marking partition persistence complete"
);
// Invariant: the non-empty partition counter is always >0 at this
// point because this partition is non-empty.
assert!(self.partition_counter.read() > 0);
// If this partition is now empty, the buffered partition counter should
// be decremented to return the permit to the per-namespace pool.
//
// Check the actual partition data content (rather than relying on
// is_empty bool) as the list was modified above and the bool state may
// need updating.
if self.persisting.is_empty() && self.buffer.rows() == 0 {
// This partitioning from non-empty to empty.
self.partition_counter.dec();
self.is_empty = true;
}
// Return the set of IDs this buffer contained.
fsm.into_sequence_number_set()
}

View File

@ -3,10 +3,14 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
};
use crate::buffer_tree::BufferWriteError;
/// A counter typed for counting partitions and applying a configured limit.
///
/// No ordering is guaranteed - increments and reads may be arbitrarily
/// reordered w.r.t other memory operations (relaxed ordering).
/// This value is completely unsynchronised (in the context of memory ordering)
/// and relies on external thread synchronisation. Calls to change the value of
/// this counter may be arbitrarily reordered with other memory operations - the
/// caller is responsible for providing external ordering where necessary.
#[derive(Debug)]
pub(crate) struct PartitionCounter {
current: AtomicUsize,
@ -22,22 +26,51 @@ impl PartitionCounter {
}
/// Increment the counter by 1.
pub(crate) fn inc(&self) {
self.current.fetch_add(1, Ordering::Relaxed);
///
/// Return [`Err`] if the configured limit has been reached, else returns
/// [`Ok`] if the current value is below the limit.
pub(crate) fn inc(&self) -> Result<(), BufferWriteError> {
if self.current.fetch_add(1, Ordering::Relaxed) >= self.max {
// The limit was exceeded, roll back the addition and return an
// error.
let count = self.current.fetch_sub(1, Ordering::Relaxed);
return Err(BufferWriteError::PartitionLimit { count: count - 1 });
}
Ok(())
}
/// Decrement the counter by 1.
///
/// # Panics
///
/// This method MAY panic if the counter wraps around - never decrement more
/// than previously incremented.
pub(crate) fn dec(&self) {
let v = self.current.fetch_sub(1, Ordering::Relaxed);
// Correctness: the fetch operations are RMW, which are guaranteed to
// see a monotonic history, therefore any prior fetch_add always happens
// before the fetch_sub above.
debug_assert_ne!(v, usize::MAX); // MUST never wraparound wraparound
}
/// Returns an error if the limit has been reached and a subsequent
/// increment would fail.
pub(crate) fn is_maxed(&self) -> Result<(), BufferWriteError> {
let count = self.read();
if count >= self.max {
return Err(BufferWriteError::PartitionLimit { count });
}
Ok(())
}
/// Read the approximate counter value.
///
/// Reads may return stale values, but will always be monotonic.
pub(crate) fn read(&self) -> usize {
self.current.load(Ordering::Relaxed)
}
/// Return `true` if the configured limit has been reached.
pub(crate) fn is_maxed(&self) -> bool {
self.read() >= self.max
}
#[cfg(test)]
pub(crate) fn set(&self, v: usize) {
self.current.store(v, Ordering::Relaxed)
@ -48,6 +81,8 @@ impl PartitionCounter {
mod tests {
use super::*;
use assert_matches::assert_matches;
#[test]
fn test_counter() {
const N: usize = 100;
@ -55,12 +90,23 @@ mod tests {
let c = PartitionCounter::new(NonZeroUsize::new(N).unwrap());
for v in 0..N {
assert!(!c.is_maxed());
assert_eq!(c.read(), v);
c.inc();
assert_matches!(c.is_maxed(), Ok(()));
assert!(c.inc().is_ok());
}
assert_eq!(c.read(), N);
assert!(c.is_maxed());
assert_matches!(
c.is_maxed(),
Err(BufferWriteError::PartitionLimit { count: N })
);
assert_matches!(c.inc(), Err(BufferWriteError::PartitionLimit { count: N }));
assert_eq!(c.read(), N); // Counter value unchanged.
c.dec();
assert_eq!(c.read(), N - 1);
assert_matches!(c.is_maxed(), Ok(()));
assert_matches!(c.inc(), Ok(()));
assert_eq!(c.read(), N);
}
}

View File

@ -11,7 +11,9 @@ use super::r#trait::PartitionProvider;
use crate::{
buffer_tree::{
namespace::NamespaceName,
partition::{resolver::SortKeyResolver, PartitionData, SortKeyState},
partition::{
counter::PartitionCounter, resolver::SortKeyResolver, PartitionData, SortKeyState,
},
table::metadata::TableMetadata,
},
deferred_load::DeferredLoad,
@ -168,6 +170,7 @@ where
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table: Arc<DeferredLoad<TableMetadata>>,
partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>> {
// Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache.
@ -199,6 +202,7 @@ where
table_id,
table,
SortKeyState::Deferred(Arc::new(sort_key_resolver)),
partition_counter,
)));
}
@ -206,7 +210,14 @@ where
// Otherwise delegate to the catalog / inner impl.
self.inner
.get_partition(partition_key, namespace_id, namespace_name, table_id, table)
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table,
partition_counter,
)
.await
}
}
@ -216,6 +227,8 @@ mod tests {
// Harmless in tests - saves a bunch of extra vars.
#![allow(clippy::await_holding_lock)]
use std::num::NonZeroUsize;
use data_types::SortedColumnSet;
use iox_catalog::mem::MemCatalog;
@ -259,6 +272,7 @@ mod tests {
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_metadata_1_sec(),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
)
.await;
@ -302,6 +316,7 @@ mod tests {
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_metadata_1_sec(),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
)
.await;
@ -359,6 +374,7 @@ mod tests {
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_metadata_1_sec(),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
)
.await;
@ -394,6 +410,7 @@ mod tests {
defer_namespace_name_1_sec(),
other_table,
defer_table_metadata_1_sec(),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
)
.await;

View File

@ -15,7 +15,8 @@ use crate::{
buffer_tree::{
namespace::NamespaceName,
partition::{
resolver::build_sort_key_from_sort_key_ids_and_columns, PartitionData, SortKeyState,
counter::PartitionCounter, resolver::build_sort_key_from_sort_key_ids_and_columns,
PartitionData, SortKeyState,
},
table::metadata::TableMetadata,
},
@ -76,6 +77,7 @@ impl PartitionProvider for CatalogPartitionResolver {
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table: Arc<DeferredLoad<TableMetadata>>,
partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>> {
debug!(
%partition_key,
@ -119,6 +121,7 @@ impl PartitionProvider for CatalogPartitionResolver {
table_id,
table,
SortKeyState::Provided(sort_key, p_sort_key_ids),
partition_counter,
)))
}
}
@ -128,7 +131,7 @@ mod tests {
// Harmless in tests - saves a bunch of extra vars.
#![allow(clippy::await_holding_lock)]
use std::{sync::Arc, time::Duration};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use assert_matches::assert_matches;
use iox_catalog::{
@ -181,6 +184,7 @@ mod tests {
},
&metrics,
)),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
)
.await;

View File

@ -14,7 +14,9 @@ use parking_lot::Mutex;
use crate::{
buffer_tree::{
namespace::NamespaceName, partition::PartitionData, table::metadata::TableMetadata,
namespace::NamespaceName,
partition::{counter::PartitionCounter, PartitionData},
table::metadata::TableMetadata,
},
deferred_load::DeferredLoad,
};
@ -147,6 +149,7 @@ where
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table: Arc<DeferredLoad<TableMetadata>>,
partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>> {
let key = Key {
table_id,
@ -170,6 +173,7 @@ where
namespace_name,
table_id,
table,
partition_counter,
));
// Make the future poll-able by many callers, all of which
@ -233,6 +237,7 @@ async fn do_fetch<T>(
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table: Arc<DeferredLoad<TableMetadata>>,
partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>>
where
T: PartitionProvider + 'static,
@ -247,7 +252,14 @@ where
// (which would cause the connection to be returned).
tokio::spawn(async move {
inner
.get_partition(partition_key, namespace_id, namespace_name, table_id, table)
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table,
partition_counter,
)
.await
})
.await
@ -258,6 +270,7 @@ where
mod tests {
use std::{
future,
num::NonZeroUsize,
sync::Arc,
task::{Context, Poll},
time::Duration,
@ -300,6 +313,7 @@ mod tests {
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_metadata_1_sec(),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
)
})
.collect::<FuturesUnordered<_>>()
@ -334,6 +348,7 @@ mod tests {
_namespace_name: Arc<DeferredLoad<NamespaceName>>,
_table_id: TableId,
_table: Arc<DeferredLoad<TableMetadata>>,
_partition_counter: Arc<PartitionCounter>,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Arc<Mutex<PartitionData>>>
@ -376,6 +391,7 @@ mod tests {
Arc::clone(&namespace_loader),
ARBITRARY_TABLE_ID,
Arc::clone(&table_loader),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
);
let pa_2 = layer.get_partition(
ARBITRARY_PARTITION_KEY.clone(),
@ -383,6 +399,7 @@ mod tests {
Arc::clone(&namespace_loader),
ARBITRARY_TABLE_ID,
Arc::clone(&table_loader),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
);
let waker = futures::task::noop_waker();
@ -403,6 +420,7 @@ mod tests {
namespace_loader,
ARBITRARY_TABLE_ID,
table_loader,
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
)
.with_timeout_panic(Duration::from_secs(5))
.await;
@ -433,6 +451,7 @@ mod tests {
_namespace_name: Arc<DeferredLoad<NamespaceName>>,
_table_id: TableId,
_table: Arc<DeferredLoad<TableMetadata>>,
_partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>> {
let waker = self.wait.notified();
let permit = self.sem.acquire().await.unwrap();
@ -473,6 +492,7 @@ mod tests {
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_metadata_1_sec(),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
);
let waker = futures::task::noop_waker();

View File

@ -9,7 +9,9 @@ use parking_lot::Mutex;
use super::r#trait::PartitionProvider;
use crate::{
buffer_tree::{
namespace::NamespaceName, partition::PartitionData, table::metadata::TableMetadata,
namespace::NamespaceName,
partition::{counter::PartitionCounter, PartitionData},
table::metadata::TableMetadata,
},
deferred_load::DeferredLoad,
test_util::PartitionDataBuilder,
@ -57,6 +59,7 @@ impl PartitionProvider for MockPartitionProvider {
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table: Arc<DeferredLoad<TableMetadata>>,
partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>> {
let p = self
.partitions
@ -68,14 +71,17 @@ impl PartitionProvider for MockPartitionProvider {
let mut p = p.with_namespace_id(namespace_id);
// If the test provided a namespace/table loader, use it, otherwise default to the
// one provided in this call.
// If the test provided a namespace/table loader, use it, otherwise
// default to the one provided in this call.
if p.namespace_loader().is_none() {
p = p.with_namespace_loader(namespace_name);
}
if p.table_loader().is_none() {
p = p.with_table_loader(table);
}
if p.partition_counter().is_none() {
p = p.with_partition_counter(partition_counter);
}
Arc::new(Mutex::new(p.build()))
}

View File

@ -14,7 +14,9 @@ use super::PartitionProvider;
use crate::{
buffer_tree::{
namespace::NamespaceName,
partition::{resolver::SortKeyResolver, PartitionData, SortKeyState},
partition::{
counter::PartitionCounter, resolver::SortKeyResolver, PartitionData, SortKeyState,
},
table::metadata::TableMetadata,
},
deferred_load::DeferredLoad,
@ -148,6 +150,7 @@ where
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table: Arc<DeferredLoad<TableMetadata>>,
partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>> {
let hash_id = PartitionHashId::new(table_id, &partition_key);
@ -189,6 +192,7 @@ where
table_id,
table,
SortKeyState::Deferred(Arc::new(sort_key_resolver)),
partition_counter,
)));
}
@ -202,14 +206,21 @@ where
// This partition MAY be an old-style / row-ID-addressed partition
// that needs querying for.
self.inner
.get_partition(partition_key, namespace_id, namespace_name, table_id, table)
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table,
partition_counter,
)
.await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{num::NonZeroUsize, sync::Arc};
use data_types::PartitionId;
use hashbrown::HashMap;
@ -248,6 +259,7 @@ mod tests {
_namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
_table: Arc<DeferredLoad<TableMetadata>>,
_partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>> {
let mut builder = PartitionDataBuilder::default();
@ -366,7 +378,8 @@ mod tests {
ARBITRARY_NAMESPACE_ID,
defer_namespace_name_1_sec(),
p.table_id,
defer_table_metadata_1_sec()
defer_table_metadata_1_sec(),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
));
let got_id = got.lock().partition_id().clone();
@ -417,6 +430,7 @@ mod tests {
defer_namespace_name_1_sec(),
ARBITRARY_TABLE_ID,
defer_table_metadata_1_sec(),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
)
.await;

View File

@ -6,7 +6,9 @@ use parking_lot::Mutex;
use crate::{
buffer_tree::{
namespace::NamespaceName, partition::PartitionData, table::metadata::TableMetadata,
namespace::NamespaceName,
partition::{counter::PartitionCounter, PartitionData},
table::metadata::TableMetadata,
},
deferred_load::DeferredLoad,
};
@ -27,6 +29,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug {
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table: Arc<DeferredLoad<TableMetadata>>,
partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>>;
}
@ -42,16 +45,24 @@ where
namespace_name: Arc<DeferredLoad<NamespaceName>>,
table_id: TableId,
table: Arc<DeferredLoad<TableMetadata>>,
partition_counter: Arc<PartitionCounter>,
) -> Arc<Mutex<PartitionData>> {
(**self)
.get_partition(partition_key, namespace_id, namespace_name, table_id, table)
.get_partition(
partition_key,
namespace_id,
namespace_name,
table_id,
table,
partition_counter,
)
.await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{num::NonZeroUsize, sync::Arc};
use super::*;
use crate::{
@ -81,6 +92,7 @@ mod tests {
Arc::clone(&namespace_loader),
ARBITRARY_TABLE_ID,
Arc::clone(&table_loader),
Arc::new(PartitionCounter::new(NonZeroUsize::new(1).unwrap())),
)
.await;
assert_eq!(

View File

@ -277,6 +277,7 @@ mod tests {
table::{metadata::TableMetadata, metadata_resolver::mock::MockTableProvider},
},
deferred_load::{self, DeferredLoad},
persist::{drain_buffer::persist_partitions, queue::mock::MockPersistQueue},
query::partition_response::PartitionResponse,
test_util::{
defer_namespace_name_1_ms, make_write_op, PartitionDataBuilder,
@ -821,24 +822,27 @@ mod tests {
async fn test_write_query_partition_limit_enforced() {
maybe_start_logging();
let other_table_id = TableId::new(ARBITRARY_TABLE_ID.get() + 1);
let partition_provider = Arc::new(
MockPartitionProvider::default()
.with_partition(
PartitionDataBuilder::new().with_partition_key(ARBITRARY_PARTITION_KEY.clone()),
)
.with_partition(
PartitionDataBuilder::new().with_partition_key(PARTITION2_KEY.clone()),
PartitionDataBuilder::new()
.with_partition_key(PARTITION2_KEY.clone())
.with_table_id(other_table_id),
),
);
let table_provider = Arc::clone(&*ARBITRARY_TABLE_PROVIDER);
let partition_count_limit = 1;
let buf = BufferTree::new(
Arc::new(MockNamespaceNameProvider::new(&**ARBITRARY_NAMESPACE_NAME)),
table_provider,
partition_provider,
NonZeroUsize::new(partition_count_limit).unwrap(),
NonZeroUsize::new(1).unwrap(),
Arc::new(MockPostWriteObserver::default()),
Arc::new(metric::Registry::default()),
);
@ -861,23 +865,69 @@ mod tests {
.expect("failed to perform first write");
// Second write to a second table should hit the limit and be rejected
let err = buf
.apply(IngestOp::Write(make_write_op(
&PARTITION2_KEY,
ARBITRARY_NAMESPACE_ID,
"BANANASareDIFFERENT",
TableId::new(ARBITRARY_TABLE_ID.get() + 1),
0,
"BANANASareDIFFERENT,region=Asturias temp=35 4242424242",
None,
)))
.await
.expect_err("limit should be enforced");
{
let err = buf
.apply(IngestOp::Write(make_write_op(
&PARTITION2_KEY,
ARBITRARY_NAMESPACE_ID,
"BANANASareDIFFERENT",
other_table_id,
0,
"BANANASareDIFFERENT,region=Asturias temp=35 4242424242",
None,
)))
.await
.expect_err("limit should be enforced");
assert_matches!(err, BufferWriteError::PartitionLimit { count: 1 });
assert_matches!(err, BufferWriteError::PartitionLimit { count: 1 });
}
// Only one partition should exist (second was rejected)
// Only one partition should exist (second was rejected and not created)
assert_eq!(buf.partitions().count(), 1);
// Persisting all the data in the buffered partition should allow the
// second partition to be created and wrote to.
persist_partitions(buf.partitions(), &Arc::new(MockPersistQueue::default())).await;
// The second write for the second partition must now be accepted.
// Second write to a second table should hit the limit and be rejected
buf.apply(IngestOp::Write(make_write_op(
&PARTITION2_KEY,
ARBITRARY_NAMESPACE_ID,
"BANANASareDIFFERENT",
other_table_id,
0,
"BANANASareDIFFERENT,region=Asturias temp=35 4242424242",
None,
)))
.await
.expect("failed to perform write after persist");
// There should now be 2 partitions.
assert_eq!(buf.partitions().count(), 2);
// And writing to the first, existing partition should fail, as the
// non-empty partition limit has been reached.
{
let err = buf
.apply(IngestOp::Write(make_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
format!(
r#"{},region=Asturias temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
)
.as_str(),
None,
)))
.await
.expect_err("limit should be enforced");
assert_matches!(err, BufferWriteError::PartitionLimit { count: 1 });
}
}
/// Ensure partition pruning during query execution also prunes metadata

View File

@ -39,8 +39,6 @@ use crate::{
query_adaptor::QueryAdaptor,
};
const MAX_NAMESPACE_PARTITION_COUNT: usize = usize::MAX;
/// Data of a Table in a given Namespace
#[derive(Debug)]
pub(crate) struct TableData<O> {
@ -58,13 +56,11 @@ pub(crate) struct TableData<O> {
// Map of partition key to its data
partition_data: ArcMap<PartitionKey, Mutex<PartitionData>>,
/// A counter tracking the approximate number of partitions currently
/// buffered.
/// A counter tracking the number of non-empty partitions currently
/// buffered for the parent namespace.
///
/// This counter is NOT atomically incremented w.r.t creation of the
/// partitions it tracks, and therefore is susceptible to "overrun",
/// breaching the configured partition count limit by a relatively small
/// degree.
/// This counter is eventually consistent / relaxed when read, but strongly
/// consistent when enforced.
partition_count: Arc<PartitionCounter>,
post_write_observer: Arc<O>,
@ -145,16 +141,15 @@ where
let p = self.partition_data.get(&partition_key);
let partition_data = match p {
Some(p) => p,
None if self.partition_count.is_maxed() => {
// This namespace has exceeded the upper bound on partitions.
//
// This counter is approximate, but monotonic - the count may be
// over the desired limit.
return Err(BufferWriteError::PartitionLimit {
count: self.partition_count.read(),
});
}
None => {
// This namespace has may have exceeded the upper bound on
// non-empty partitions.
//
// As an optimisation, check if the partition will be writeable
// before creating it, to avoid creating many
// impossible-to-write-to partitions.
self.partition_count.is_maxed()?;
let p = self
.partition_provider
.get_partition(
@ -163,16 +158,15 @@ where
Arc::clone(&self.namespace_name),
self.table_id,
Arc::clone(&self.catalog_table),
Arc::clone(&self.partition_count),
)
.await;
// Add the partition to the map.
//
// This MAY return a different instance than `p` if another
// thread has already initialised the partition.
self.partition_data.get_or_insert_with(&partition_key, || {
self.partition_count.inc();
p
})
self.partition_data.get_or_insert_with(&partition_key, || p)
}
};
@ -405,12 +399,12 @@ mod tests {
#[tokio::test]
async fn test_partition_init() {
let partition_counter = Arc::new(PartitionCounter::new(NonZeroUsize::new(42).unwrap()));
// Configure the mock partition provider to return a partition for a table ID.
let partition_provider =
Arc::new(MockPartitionProvider::default().with_partition(PartitionDataBuilder::new()));
let partition_counter = Arc::new(PartitionCounter::new(NonZeroUsize::new(42).unwrap()));
let table = TableData::new(
ARBITRARY_TABLE_ID,
defer_table_metadata_1_sec(),
@ -445,7 +439,8 @@ mod tests {
// Referencing the partition should succeed
assert!(table.partition_data.get(&ARBITRARY_PARTITION_KEY).is_some());
// The partition should have been recorded in the partition count.
// The partition should have been recorded in the "non-empty" partition
// count.
assert_eq!(partition_counter.read(), 1);
}

View File

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use std::{collections::BTreeMap, num::NonZeroUsize, sync::Arc, time::Duration};
use data_types::{
partition_template::TablePartitionTemplateOverride, ColumnId, ColumnSet, NamespaceId,
@ -18,7 +18,7 @@ use crate::{
name_resolver::{mock::MockNamespaceNameProvider, NamespaceNameProvider},
NamespaceName,
},
partition::{PartitionData, SortKeyState},
partition::{counter::PartitionCounter, PartitionData, SortKeyState},
table::{
metadata::{TableMetadata, TableName},
metadata_resolver::{mock::MockTableProvider, TableProvider},
@ -92,6 +92,7 @@ pub(crate) struct PartitionDataBuilder {
table_loader: Option<Arc<DeferredLoad<TableMetadata>>>,
namespace_loader: Option<Arc<DeferredLoad<NamespaceName>>>,
sort_key: Option<SortKeyState>,
partition_counter: Option<Arc<PartitionCounter>>,
}
impl Default for PartitionDataBuilder {
@ -99,11 +100,12 @@ impl Default for PartitionDataBuilder {
Self {
table_id: ARBITRARY_TABLE_ID,
partition_key: ARBITRARY_PARTITION_KEY.clone(),
partition_id: Default::default(),
namespace_id: Default::default(),
table_loader: Default::default(),
namespace_loader: Default::default(),
sort_key: Default::default(),
partition_id: None,
namespace_id: None,
table_loader: None,
namespace_loader: None,
sort_key: None,
partition_counter: None,
}
}
}
@ -170,6 +172,18 @@ impl PartitionDataBuilder {
self
}
pub(crate) fn with_partition_counter(
mut self,
partition_counter: Arc<PartitionCounter>,
) -> Self {
self.partition_counter = Some(partition_counter);
self
}
pub(crate) fn partition_counter(&self) -> Option<Arc<PartitionCounter>> {
self.partition_counter.clone()
}
/// Generate a valid [`PartitionData`] for use in tests where the exact values (or at least
/// some of them) don't particularly matter.
pub(crate) fn build(self) -> PartitionData {
@ -189,6 +203,8 @@ impl PartitionDataBuilder {
self.table_id,
self.table_loader.unwrap_or_else(defer_table_metadata_1_sec),
self.sort_key.unwrap_or(SortKeyState::Provided(None, None)),
self.partition_counter
.unwrap_or_else(|| Arc::new(PartitionCounter::new(NonZeroUsize::MAX))),
)
}
}