refactor: decouple partition init from table
Removes the "how" of initialising a per-partition buffer structure (PartitionData) from the per-table buffer (TableData). This is a cleaner separation of concerns - a table buffer is responsible for addressing and initialising per-table partitions as necessary, and buffering of ops for them. It does not have to be concerned with the series of steps necessary to look up the various bits of data in order to construct a PartitionData. This abstract provider can be layered up to provide more complex behaviours - I intend to add a read-through cache impl that decorates the catalog impl in this commit, which should eliminate most partition queries at ingester startup utilising the indirection added here.pull/24376/head
parent
b11da1d98b
commit
61aecc3044
|
@ -164,12 +164,7 @@ impl IngesterData {
|
|||
.get(&shard_id)
|
||||
.context(ShardNotFoundSnafu { shard_id })?;
|
||||
shard_data
|
||||
.buffer_operation(
|
||||
dml_operation,
|
||||
self.catalog.as_ref(),
|
||||
lifecycle_handle,
|
||||
&self.exec,
|
||||
)
|
||||
.buffer_operation(dml_operation, &self.catalog, lifecycle_handle, &self.exec)
|
||||
.await
|
||||
}
|
||||
|
||||
|
@ -1346,12 +1341,7 @@ mod tests {
|
|||
// to 1 already, so it shouldn't be buffered and the buffer should
|
||||
// remain empty.
|
||||
let should_pause = data
|
||||
.buffer_operation(
|
||||
DmlOperation::Write(w1),
|
||||
catalog.as_ref(),
|
||||
&manager.handle(),
|
||||
&exec,
|
||||
)
|
||||
.buffer_operation(DmlOperation::Write(w1), &catalog, &manager.handle(), &exec)
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
|
@ -1367,14 +1357,9 @@ mod tests {
|
|||
assert!(!should_pause);
|
||||
|
||||
// w2 should be in the buffer
|
||||
data.buffer_operation(
|
||||
DmlOperation::Write(w2),
|
||||
catalog.as_ref(),
|
||||
&manager.handle(),
|
||||
&exec,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
data.buffer_operation(DmlOperation::Write(w2), &catalog, &manager.handle(), &exec)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let table_data = data.table_data("mem").unwrap();
|
||||
let table = table_data.read().await;
|
||||
|
|
|
@ -16,12 +16,15 @@ use write_summary::ShardProgress;
|
|||
|
||||
#[cfg(test)]
|
||||
use super::triggers::TestTriggers;
|
||||
use super::{partition::PersistingBatch, table::TableData};
|
||||
use super::{
|
||||
partition::{resolver::CatalogPartitionResolver, PersistingBatch},
|
||||
table::TableData,
|
||||
};
|
||||
use crate::lifecycle::LifecycleHandle;
|
||||
|
||||
/// Data of a Namespace that belongs to a given Shard
|
||||
#[derive(Debug)]
|
||||
pub struct NamespaceData {
|
||||
pub(crate) struct NamespaceData {
|
||||
namespace_id: NamespaceId,
|
||||
|
||||
/// The catalog ID of the shard this namespace is being populated from.
|
||||
|
@ -103,7 +106,7 @@ impl NamespaceData {
|
|||
pub(super) async fn buffer_operation(
|
||||
&self,
|
||||
dml_operation: DmlOperation,
|
||||
catalog: &dyn Catalog,
|
||||
catalog: &Arc<dyn Catalog>,
|
||||
lifecycle_handle: &dyn LifecycleHandle,
|
||||
executor: &Executor,
|
||||
) -> Result<bool, super::Error> {
|
||||
|
@ -144,7 +147,6 @@ impl NamespaceData {
|
|||
sequence_number,
|
||||
b,
|
||||
partition_key.clone(),
|
||||
catalog,
|
||||
lifecycle_handle,
|
||||
)
|
||||
.await?;
|
||||
|
@ -166,7 +168,7 @@ impl NamespaceData {
|
|||
let mut table_data = table_data.write().await;
|
||||
|
||||
table_data
|
||||
.buffer_delete(delete.predicate(), sequence_number, catalog, executor)
|
||||
.buffer_delete(delete.predicate(), sequence_number, &**catalog, executor)
|
||||
.await?;
|
||||
|
||||
// don't pause writes since deletes don't count towards memory limits
|
||||
|
@ -233,7 +235,7 @@ impl NamespaceData {
|
|||
async fn insert_table(
|
||||
&self,
|
||||
table_name: &str,
|
||||
catalog: &dyn Catalog,
|
||||
catalog: &Arc<dyn Catalog>,
|
||||
) -> Result<Arc<tokio::sync::RwLock<TableData>>, super::Error> {
|
||||
let mut repos = catalog.repositories().await;
|
||||
let info = repos
|
||||
|
@ -245,6 +247,9 @@ impl NamespaceData {
|
|||
|
||||
let mut t = self.tables.write();
|
||||
|
||||
// TODO: share this server-wide
|
||||
let partition_provider = Arc::new(CatalogPartitionResolver::new(Arc::clone(catalog)));
|
||||
|
||||
let data = match t.entry(table_name.to_string()) {
|
||||
Entry::Vacant(v) => {
|
||||
let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new(
|
||||
|
@ -252,6 +257,7 @@ impl NamespaceData {
|
|||
table_name,
|
||||
self.shard_id,
|
||||
info.tombstone_max_sequence_number,
|
||||
partition_provider,
|
||||
))));
|
||||
self.table_count.inc(1);
|
||||
Arc::clone(v)
|
||||
|
|
|
@ -15,6 +15,7 @@ use self::buffer::{BufferBatch, DataBuffer};
|
|||
use crate::{data::query_dedup::query, query::QueryableBatch};
|
||||
|
||||
mod buffer;
|
||||
pub mod resolver;
|
||||
|
||||
/// Read only copy of the unpersisted data for a partition in the ingester for a specific partition.
|
||||
#[derive(Debug)]
|
||||
|
@ -285,6 +286,13 @@ impl PartitionData {
|
|||
self.max_persisted_sequence_number = Some(sequence_number);
|
||||
self.data.mark_persisted();
|
||||
}
|
||||
|
||||
/// Return the name of the table this [`PartitionData`] is buffering writes
|
||||
/// for.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn table_name(&self) -> &str {
|
||||
self.table_name.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
//! A [`PartitionProvider`] implementation that hits the [`Catalog`] to resolve
|
||||
//! the partition id and persist offset.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::{Partition, PartitionKey, ShardId, TableId};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use observability_deps::tracing::debug;
|
||||
|
||||
use crate::data::partition::PartitionData;
|
||||
|
||||
use super::r#trait::PartitionProvider;
|
||||
|
||||
/// A [`PartitionProvider`] implementation that hits the [`Catalog`] to resolve
|
||||
/// the partition id and persist offset, returning an initialised
|
||||
/// [`PartitionData`].
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CatalogPartitionResolver {
|
||||
catalog: Arc<dyn Catalog>,
|
||||
backoff_config: BackoffConfig,
|
||||
}
|
||||
|
||||
impl CatalogPartitionResolver {
|
||||
/// Construct a [`CatalogPartitionResolver`] that looks up partitions in
|
||||
/// `catalog`.
|
||||
pub(crate) fn new(catalog: Arc<dyn Catalog>) -> Self {
|
||||
Self {
|
||||
catalog,
|
||||
backoff_config: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get(
|
||||
&self,
|
||||
partition_key: PartitionKey,
|
||||
shard_id: ShardId,
|
||||
table_id: TableId,
|
||||
) -> Result<Partition, iox_catalog::interface::Error> {
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.create_or_get(partition_key, shard_id, table_id)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PartitionProvider for CatalogPartitionResolver {
|
||||
async fn get_partition(
|
||||
&self,
|
||||
partition_key: PartitionKey,
|
||||
shard_id: ShardId,
|
||||
table_id: TableId,
|
||||
table_name: Arc<str>,
|
||||
) -> PartitionData {
|
||||
debug!(
|
||||
%partition_key,
|
||||
%shard_id, %table_id, %table_name, "upserting partition in catalog"
|
||||
);
|
||||
let p = Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("resolve partition", || {
|
||||
self.get(partition_key.clone(), shard_id, table_id)
|
||||
})
|
||||
.await
|
||||
.expect("retry forever");
|
||||
|
||||
PartitionData::new(
|
||||
p.id,
|
||||
shard_id,
|
||||
table_id,
|
||||
table_name,
|
||||
p.persisted_sequence_number,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::ShardIndex;
|
||||
|
||||
use super::*;
|
||||
|
||||
const TABLE_NAME: &str = "bananas";
|
||||
const PARTITION_KEY: &str = "platanos";
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_resolver() {
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let catalog: Arc<dyn Catalog> =
|
||||
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
||||
let (shard_id, table_id) = {
|
||||
let mut repos = catalog.repositories().await;
|
||||
let t = repos.topics().create_or_get("platanos").await.unwrap();
|
||||
let q = repos.query_pools().create_or_get("platanos").await.unwrap();
|
||||
let ns = repos
|
||||
.namespaces()
|
||||
.create(
|
||||
TABLE_NAME,
|
||||
iox_catalog::INFINITE_RETENTION_POLICY,
|
||||
t.id,
|
||||
q.id,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let shard = repos
|
||||
.shards()
|
||||
.create_or_get(&t, ShardIndex::new(0))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let table = repos
|
||||
.tables()
|
||||
.create_or_get(TABLE_NAME, ns.id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
(shard.id, table.id)
|
||||
};
|
||||
|
||||
let table_name = TABLE_NAME.into();
|
||||
let resolver = CatalogPartitionResolver::new(Arc::clone(&catalog));
|
||||
let got = resolver
|
||||
.get_partition(
|
||||
PartitionKey::from(PARTITION_KEY),
|
||||
shard_id,
|
||||
table_id,
|
||||
Arc::clone(&table_name),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(*got.table_name(), *table_name);
|
||||
assert_eq!(got.max_persisted_sequence_number(), None);
|
||||
|
||||
let got = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.get_by_id(got.id)
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("partition not created");
|
||||
assert_eq!(got.shard_id, shard_id);
|
||||
assert_eq!(got.table_id, table_id);
|
||||
assert_eq!(got.partition_key, PartitionKey::from(PARTITION_KEY));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
//! A mock [`PartitionProvider`] to inject [`PartitionData`] for tests.
|
||||
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{PartitionKey, ShardId, TableId};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::data::partition::PartitionData;
|
||||
|
||||
use super::r#trait::PartitionProvider;
|
||||
|
||||
/// A mock [`PartitionProvider`] for testing that returns pre-initialised
|
||||
/// [`PartitionData`] for configured `(key, shard, table)` triplets.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct MockPartitionProvider {
|
||||
partitions: Mutex<HashMap<(PartitionKey, ShardId, TableId), PartitionData>>,
|
||||
}
|
||||
|
||||
impl MockPartitionProvider {
|
||||
/// A builder helper for [`Self::insert()`].
|
||||
#[must_use]
|
||||
pub(crate) fn with_partition(
|
||||
mut self,
|
||||
partition_key: PartitionKey,
|
||||
shard_id: ShardId,
|
||||
table_id: TableId,
|
||||
data: PartitionData,
|
||||
) -> Self {
|
||||
self.insert(partition_key, shard_id, table_id, data);
|
||||
self
|
||||
}
|
||||
|
||||
/// Add `data` to the mock state, returning it when asked for the specified
|
||||
/// `(key, shard, table)` triplet.
|
||||
pub(crate) fn insert(
|
||||
&mut self,
|
||||
partition_key: PartitionKey,
|
||||
shard_id: ShardId,
|
||||
table_id: TableId,
|
||||
data: PartitionData,
|
||||
) {
|
||||
assert!(
|
||||
self.partitions
|
||||
.lock()
|
||||
.insert((partition_key, shard_id, table_id), data)
|
||||
.is_none(),
|
||||
"overwriting an existing mock PartitionData"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PartitionProvider for MockPartitionProvider {
|
||||
async fn get_partition(
|
||||
&self,
|
||||
partition_key: PartitionKey,
|
||||
shard_id: ShardId,
|
||||
table_id: TableId,
|
||||
table_name: Arc<str>,
|
||||
) -> PartitionData {
|
||||
let p = self
|
||||
.partitions
|
||||
.lock()
|
||||
.remove(&(partition_key.clone(), shard_id, table_id))
|
||||
.unwrap_or_else(|| {
|
||||
panic!("no partition data for mock ({partition_key:?}, {shard_id:?}, {table_id:?})")
|
||||
});
|
||||
|
||||
assert_eq!(*p.table_name(), *table_name);
|
||||
p
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
//! An abstract resolver of [`PartitionData`] for a given shard & table.
|
||||
//!
|
||||
//! [`PartitionData`]: crate::data::partition::PartitionData
|
||||
|
||||
mod r#trait;
|
||||
pub(crate) use r#trait::*;
|
||||
|
||||
mod catalog;
|
||||
pub(crate) use catalog::*;
|
||||
|
||||
#[cfg(test)]
|
||||
mod mock;
|
||||
#[cfg(test)]
|
||||
pub(crate) use mock::*;
|
|
@ -0,0 +1,71 @@
|
|||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{PartitionKey, ShardId, TableId};
|
||||
|
||||
use crate::data::partition::PartitionData;
|
||||
|
||||
/// An infallible resolver of [`PartitionData`] for the specified shard, table,
|
||||
/// and partition key, returning an initialised [`PartitionData`] buffer for it.
|
||||
#[async_trait]
|
||||
pub(crate) trait PartitionProvider: Send + Sync + Debug {
|
||||
async fn get_partition(
|
||||
&self,
|
||||
partition_key: PartitionKey,
|
||||
shard_id: ShardId,
|
||||
table_id: TableId,
|
||||
table_name: Arc<str>,
|
||||
) -> PartitionData;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T> PartitionProvider for Arc<T>
|
||||
where
|
||||
T: PartitionProvider,
|
||||
{
|
||||
async fn get_partition(
|
||||
&self,
|
||||
partition_key: PartitionKey,
|
||||
shard_id: ShardId,
|
||||
table_id: TableId,
|
||||
table_name: Arc<str>,
|
||||
) -> PartitionData {
|
||||
(**self)
|
||||
.get_partition(partition_key, shard_id, table_id, table_name)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::PartitionId;
|
||||
|
||||
use crate::data::partition::resolver::MockPartitionProvider;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arc_impl() {
|
||||
let key = PartitionKey::from("bananas");
|
||||
let shard_id = ShardId::new(42);
|
||||
let table_id = TableId::new(24);
|
||||
let table_name = "platanos".into();
|
||||
let partition = PartitionId::new(4242);
|
||||
let data = PartitionData::new(partition, shard_id, table_id, Arc::clone(&table_name), None);
|
||||
|
||||
let mock = Arc::new(MockPartitionProvider::default().with_partition(
|
||||
key.clone(),
|
||||
shard_id,
|
||||
table_id,
|
||||
data,
|
||||
));
|
||||
|
||||
let got = mock
|
||||
.get_partition(key, shard_id, table_id, Arc::clone(&table_name))
|
||||
.await;
|
||||
assert_eq!(got.id(), partition);
|
||||
assert_eq!(*got.table_name(), *table_name);
|
||||
}
|
||||
}
|
|
@ -58,14 +58,14 @@ impl ShardData {
|
|||
pub(super) async fn buffer_operation(
|
||||
&self,
|
||||
dml_operation: DmlOperation,
|
||||
catalog: &dyn Catalog,
|
||||
catalog: &Arc<dyn Catalog>,
|
||||
lifecycle_handle: &dyn LifecycleHandle,
|
||||
executor: &Executor,
|
||||
) -> Result<bool, super::Error> {
|
||||
let namespace_data = match self.namespace(dml_operation.namespace()) {
|
||||
Some(d) => d,
|
||||
None => {
|
||||
self.insert_namespace(dml_operation.namespace(), catalog)
|
||||
self.insert_namespace(dml_operation.namespace(), &**catalog)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
|
|
@ -9,7 +9,9 @@ use mutable_batch::MutableBatch;
|
|||
use snafu::ResultExt;
|
||||
use write_summary::ShardProgress;
|
||||
|
||||
use super::partition::{PartitionData, PartitionStatus, UnpersistedPartitionData};
|
||||
use super::partition::{
|
||||
resolver::PartitionProvider, PartitionData, PartitionStatus, UnpersistedPartitionData,
|
||||
};
|
||||
use crate::lifecycle::LifecycleHandle;
|
||||
|
||||
/// Data of a Table in a given Namesapce that belongs to a given Shard
|
||||
|
@ -23,17 +25,32 @@ pub(crate) struct TableData {
|
|||
|
||||
// the max sequence number for a tombstone associated with this table
|
||||
tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
|
||||
/// An abstract constructor of [`PartitionData`] instances for a given
|
||||
/// `(key, shard, table)` triplet.
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
|
||||
// Map pf partition key to its data
|
||||
pub(super) partition_data: BTreeMap<PartitionKey, PartitionData>,
|
||||
}
|
||||
|
||||
impl TableData {
|
||||
/// Initialize new table buffer
|
||||
/// Initialize new table buffer identified by [`TableId`] in the catalog.
|
||||
///
|
||||
/// Optionally the given tombstone max [`SequenceNumber`] identifies the
|
||||
/// inclusive upper bound of tombstones associated with this table. Any data
|
||||
/// greater than this value is guaranteed to not (yet) have a delete
|
||||
/// tombstone that must be resolved.
|
||||
///
|
||||
/// The partition provider is used to instantiate a [`PartitionData`]
|
||||
/// instance when this [`TableData`] instance observes an op for a partition
|
||||
/// for the first time.
|
||||
pub fn new(
|
||||
table_id: TableId,
|
||||
table_name: &str,
|
||||
shard_id: ShardId,
|
||||
tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_id,
|
||||
|
@ -41,6 +58,7 @@ impl TableData {
|
|||
shard_id,
|
||||
tombstone_max_sequence_number,
|
||||
partition_data: Default::default(),
|
||||
partition_provider,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,14 +84,25 @@ impl TableData {
|
|||
sequence_number: SequenceNumber,
|
||||
batch: MutableBatch,
|
||||
partition_key: PartitionKey,
|
||||
catalog: &dyn Catalog,
|
||||
lifecycle_handle: &dyn LifecycleHandle,
|
||||
) -> Result<bool, super::Error> {
|
||||
let partition_data = match self.partition_data.get_mut(&partition_key) {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
self.insert_partition(partition_key.clone(), self.shard_id, catalog)
|
||||
.await?;
|
||||
let p = self
|
||||
.partition_provider
|
||||
.get_partition(
|
||||
partition_key.clone(),
|
||||
self.shard_id,
|
||||
self.table_id,
|
||||
Arc::clone(&self.table_name),
|
||||
)
|
||||
.await;
|
||||
// Add the partition to the map.
|
||||
assert!(self
|
||||
.partition_data
|
||||
.insert(partition_key.clone(), p)
|
||||
.is_none());
|
||||
self.partition_data.get_mut(&partition_key).unwrap()
|
||||
}
|
||||
};
|
||||
|
@ -149,34 +178,6 @@ impl TableData {
|
|||
.collect()
|
||||
}
|
||||
|
||||
async fn insert_partition(
|
||||
&mut self,
|
||||
partition_key: PartitionKey,
|
||||
shard_id: ShardId,
|
||||
catalog: &dyn Catalog,
|
||||
) -> Result<(), super::Error> {
|
||||
let partition = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.create_or_get(partition_key, shard_id, self.table_id)
|
||||
.await
|
||||
.context(super::CatalogSnafu)?;
|
||||
|
||||
self.partition_data.insert(
|
||||
partition.partition_key,
|
||||
PartitionData::new(
|
||||
partition.id,
|
||||
shard_id,
|
||||
self.table_id,
|
||||
Arc::clone(&self.table_name),
|
||||
partition.persisted_sequence_number,
|
||||
),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return progress from this Table
|
||||
pub(crate) fn progress(&self) -> ShardProgress {
|
||||
let progress = ShardProgress::new();
|
||||
|
|
|
@ -696,7 +696,6 @@ pub async fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> Inge
|
|||
// Make data for one shard and two tables
|
||||
let shard_index = ShardIndex::new(1);
|
||||
let shard_id = populate_catalog(&*catalog).await;
|
||||
|
||||
let mut shards = BTreeMap::new();
|
||||
shards.insert(
|
||||
shard_id,
|
||||
|
|
Loading…
Reference in New Issue