Merge pull request #6068 from influxdata/dom/mutex-pushdown
perf(ingester): granular per-partition lockingpull/24376/head
commit
61b1a455dc
|
@ -265,45 +265,46 @@ impl Persister for IngesterData {
|
||||||
let table_data = namespace.table_id(table_id).unwrap_or_else(|| {
|
let table_data = namespace.table_id(table_id).unwrap_or_else(|| {
|
||||||
panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state")
|
panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state")
|
||||||
});
|
});
|
||||||
|
// Assert various properties of the table to ensure the index is
|
||||||
|
// correct, out of an abundance of caution.
|
||||||
|
assert_eq!(table_data.shard_id(), shard_id);
|
||||||
|
assert_eq!(table_data.namespace_id(), namespace_id);
|
||||||
|
assert_eq!(table_data.table_id(), table_id);
|
||||||
|
let table_name = table_data.table_name().clone();
|
||||||
|
|
||||||
|
let partition = table_data.get_partition(partition_id).unwrap_or_else(|| {
|
||||||
|
panic!(
|
||||||
|
"partition {partition_id} in table {table_id} in namespace {namespace_id} not in shard {shard_id} state"
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
let table_name;
|
|
||||||
let partition_key;
|
let partition_key;
|
||||||
let sort_key;
|
let sort_key;
|
||||||
let last_persisted_sequence_number;
|
let last_persisted_sequence_number;
|
||||||
let batch;
|
let batch;
|
||||||
let batch_sequence_number_range;
|
let batch_sequence_number_range;
|
||||||
{
|
{
|
||||||
let mut guard = table_data.write().await;
|
// Acquire a write lock over the partition and extract all the
|
||||||
// Assert various properties of the table to ensure the index is
|
// necessary data.
|
||||||
// correct, out of an abundance of caution.
|
let mut guard = partition.lock();
|
||||||
assert_eq!(guard.shard_id(), shard_id);
|
|
||||||
assert_eq!(guard.namespace_id(), namespace_id);
|
|
||||||
assert_eq!(guard.table_id(), table_id);
|
|
||||||
table_name = guard.table_name().clone();
|
|
||||||
|
|
||||||
let partition = guard.get_partition(partition_id).unwrap_or_else(|| {
|
|
||||||
panic!(
|
|
||||||
"partition {partition_id} in table {table_id} in namespace {namespace_id} not in shard {shard_id} state"
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
// Assert various properties of the partition to ensure the index is
|
// Assert various properties of the partition to ensure the index is
|
||||||
// correct, out of an abundance of caution.
|
// correct, out of an abundance of caution.
|
||||||
assert_eq!(partition.partition_id(), partition_id);
|
assert_eq!(guard.partition_id(), partition_id);
|
||||||
assert_eq!(partition.shard_id(), shard_id);
|
assert_eq!(guard.shard_id(), shard_id);
|
||||||
assert_eq!(partition.namespace_id(), namespace_id);
|
assert_eq!(guard.namespace_id(), namespace_id);
|
||||||
assert_eq!(partition.table_id(), table_id);
|
assert_eq!(guard.table_id(), table_id);
|
||||||
assert_eq!(*partition.table_name(), table_name);
|
assert_eq!(*guard.table_name(), table_name);
|
||||||
|
|
||||||
partition_key = partition.partition_key().clone();
|
partition_key = guard.partition_key().clone();
|
||||||
sort_key = partition.sort_key().clone();
|
sort_key = guard.sort_key().clone();
|
||||||
last_persisted_sequence_number = partition.max_persisted_sequence_number();
|
last_persisted_sequence_number = guard.max_persisted_sequence_number();
|
||||||
|
|
||||||
// The sequence number MUST be read without releasing the write lock
|
// The sequence number MUST be read without releasing the write lock
|
||||||
// to ensure a consistent snapshot of batch contents and batch
|
// to ensure a consistent snapshot of batch contents and batch
|
||||||
// sequence number range.
|
// sequence number range.
|
||||||
batch = partition.mark_persisting();
|
batch = guard.mark_persisting();
|
||||||
batch_sequence_number_range = partition.sequence_number_range();
|
batch_sequence_number_range = guard.sequence_number_range();
|
||||||
};
|
};
|
||||||
|
|
||||||
// From this point on, the code MUST be infallible.
|
// From this point on, the code MUST be infallible.
|
||||||
|
@ -423,12 +424,7 @@ impl Persister for IngesterData {
|
||||||
.expect("retry forever");
|
.expect("retry forever");
|
||||||
|
|
||||||
// Update the sort key in the partition cache.
|
// Update the sort key in the partition cache.
|
||||||
table_data
|
partition.lock().update_sort_key(Some(new_sort_key.clone()));
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.get_partition(partition_id)
|
|
||||||
.unwrap()
|
|
||||||
.update_sort_key(Some(new_sort_key.clone()));
|
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
%object_store_id,
|
%object_store_id,
|
||||||
|
@ -545,18 +541,15 @@ impl Persister for IngesterData {
|
||||||
// This SHOULD cause the data to be dropped, but there MAY be ongoing
|
// This SHOULD cause the data to be dropped, but there MAY be ongoing
|
||||||
// queries that currently hold a reference to the data. In either case,
|
// queries that currently hold a reference to the data. In either case,
|
||||||
// the persisted data will be dropped "shortly".
|
// the persisted data will be dropped "shortly".
|
||||||
table_data
|
partition
|
||||||
.write()
|
.lock()
|
||||||
.await
|
|
||||||
.get_partition(partition_id)
|
|
||||||
.unwrap()
|
|
||||||
.mark_persisted(iox_metadata.max_sequence_number);
|
.mark_persisted(iox_metadata.max_sequence_number);
|
||||||
|
|
||||||
// BUG: ongoing queries retain references to the persisting data,
|
// BUG: ongoing queries retain references to the persisting data,
|
||||||
// preventing it from being dropped, but memory is released back to
|
// preventing it from being dropped, but memory is released back to
|
||||||
// lifecycle memory tracker when this fn returns.
|
// lifecycle memory tracker when this fn returns.
|
||||||
//
|
//
|
||||||
// https://github.com/influxdata/influxdb_iox/issues/5872
|
// https://github.com/influxdata/influxdb_iox/issues/5805
|
||||||
//
|
//
|
||||||
info!(
|
info!(
|
||||||
%object_store_id,
|
%object_store_id,
|
||||||
|
@ -813,11 +806,12 @@ mod tests {
|
||||||
let n = sd.namespace(&"foo".into()).unwrap();
|
let n = sd.namespace(&"foo".into()).unwrap();
|
||||||
let mem_table = n.table_data(&"mem".into()).unwrap();
|
let mem_table = n.table_data(&"mem".into()).unwrap();
|
||||||
assert!(n.table_data(&"mem".into()).is_some());
|
assert!(n.table_data(&"mem".into()).is_some());
|
||||||
let mem_table = mem_table.write().await;
|
|
||||||
let p = mem_table
|
let p = mem_table
|
||||||
.get_partition_by_key(&"1970-01-01".into())
|
.get_partition_by_key(&"1970-01-01".into())
|
||||||
.unwrap();
|
.unwrap()
|
||||||
(mem_table.table_id(), p.partition_id())
|
.lock()
|
||||||
|
.partition_id();
|
||||||
|
(mem_table.table_id(), p)
|
||||||
};
|
};
|
||||||
|
|
||||||
data.persist(shard1.id, namespace.id, table_id, partition_id)
|
data.persist(shard1.id, namespace.id, table_id, partition_id)
|
||||||
|
@ -968,13 +962,12 @@ mod tests {
|
||||||
let mem_table = n.table_data(&"mem".into()).unwrap();
|
let mem_table = n.table_data(&"mem".into()).unwrap();
|
||||||
assert!(n.table_data(&"cpu".into()).is_some());
|
assert!(n.table_data(&"cpu".into()).is_some());
|
||||||
|
|
||||||
let mem_table = mem_table.write().await;
|
|
||||||
table_id = mem_table.table_id();
|
table_id = mem_table.table_id();
|
||||||
|
|
||||||
let p = mem_table
|
let p = mem_table
|
||||||
.get_partition_by_key(&"1970-01-01".into())
|
.get_partition_by_key(&"1970-01-01".into())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
partition_id = p.partition_id();
|
partition_id = p.lock().partition_id();
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
// verify the partition doesn't have a sort key before any data has been persisted
|
// verify the partition doesn't have a sort key before any data has been persisted
|
||||||
|
@ -1044,13 +1037,12 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.table_id(table_id)
|
.table_id(table_id)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.get_partition(partition_id)
|
.get_partition(partition_id)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
.lock()
|
||||||
.sort_key()
|
.sort_key()
|
||||||
.get()
|
.clone();
|
||||||
.await;
|
let cached_sort_key = cached_sort_key.get().await;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
cached_sort_key,
|
cached_sort_key,
|
||||||
Some(SortKey::from_columns(partition.sort_key))
|
Some(SortKey::from_columns(partition.sort_key))
|
||||||
|
@ -1101,10 +1093,9 @@ mod tests {
|
||||||
// verify that the parquet_max_sequence_number got updated
|
// verify that the parquet_max_sequence_number got updated
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
mem_table
|
mem_table
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.get_partition(partition_id)
|
.get_partition(partition_id)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
.lock()
|
||||||
.max_persisted_sequence_number(),
|
.max_persisted_sequence_number(),
|
||||||
Some(SequenceNumber::new(2))
|
Some(SequenceNumber::new(2))
|
||||||
);
|
);
|
||||||
|
@ -1406,15 +1397,16 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
{
|
{
|
||||||
let table_data = data.table_data(&"mem".into()).unwrap();
|
let table = data.table_data(&"mem".into()).unwrap();
|
||||||
let mut table = table_data.write().await;
|
|
||||||
assert!(table
|
assert!(table
|
||||||
.partition_iter_mut()
|
.partitions()
|
||||||
.all(|p| p.get_query_data().is_none()));
|
.into_iter()
|
||||||
|
.all(|p| p.lock().get_query_data().is_none()));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
table
|
table
|
||||||
.get_partition_by_key(&"1970-01-01".into())
|
.get_partition_by_key(&"1970-01-01".into())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
.lock()
|
||||||
.max_persisted_sequence_number(),
|
.max_persisted_sequence_number(),
|
||||||
Some(SequenceNumber::new(1))
|
Some(SequenceNumber::new(1))
|
||||||
);
|
);
|
||||||
|
@ -1426,11 +1418,10 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let table_data = data.table_data(&"mem".into()).unwrap();
|
let table = data.table_data(&"mem".into()).unwrap();
|
||||||
let table = table_data.read().await;
|
|
||||||
let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
|
let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
partition.sequence_number_range().inclusive_min(),
|
partition.lock().sequence_number_range().inclusive_min(),
|
||||||
Some(SequenceNumber::new(2))
|
Some(SequenceNumber::new(2))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -23,26 +23,26 @@ use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle};
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
struct DoubleRef {
|
struct DoubleRef {
|
||||||
// TODO(4880): this can be removed when IDs are sent over the wire.
|
// TODO(4880): this can be removed when IDs are sent over the wire.
|
||||||
by_name: HashMap<TableName, Arc<tokio::sync::RwLock<TableData>>>,
|
by_name: HashMap<TableName, Arc<TableData>>,
|
||||||
by_id: HashMap<TableId, Arc<tokio::sync::RwLock<TableData>>>,
|
by_id: HashMap<TableId, Arc<TableData>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DoubleRef {
|
impl DoubleRef {
|
||||||
fn insert(&mut self, t: TableData) -> Arc<tokio::sync::RwLock<TableData>> {
|
fn insert(&mut self, t: TableData) -> Arc<TableData> {
|
||||||
let name = t.table_name().clone();
|
let name = t.table_name().clone();
|
||||||
let id = t.table_id();
|
let id = t.table_id();
|
||||||
|
|
||||||
let t = Arc::new(tokio::sync::RwLock::new(t));
|
let t = Arc::new(t);
|
||||||
self.by_name.insert(name, Arc::clone(&t));
|
self.by_name.insert(name, Arc::clone(&t));
|
||||||
self.by_id.insert(id, Arc::clone(&t));
|
self.by_id.insert(id, Arc::clone(&t));
|
||||||
t
|
t
|
||||||
}
|
}
|
||||||
|
|
||||||
fn by_name(&self, name: &TableName) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
|
fn by_name(&self, name: &TableName) -> Option<Arc<TableData>> {
|
||||||
self.by_name.get(name).map(Arc::clone)
|
self.by_name.get(name).map(Arc::clone)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn by_id(&self, id: TableId) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
|
fn by_id(&self, id: TableId) -> Option<Arc<TableData>> {
|
||||||
self.by_id.get(&id).map(Arc::clone)
|
self.by_id.get(&id).map(Arc::clone)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -206,22 +206,19 @@ impl NamespaceData {
|
||||||
None => self.insert_table(&t, catalog).await?,
|
None => self.insert_table(&t, catalog).await?,
|
||||||
};
|
};
|
||||||
|
|
||||||
{
|
let action = table_data
|
||||||
// lock scope
|
.buffer_table_write(
|
||||||
let mut table_data = table_data.write().await;
|
sequence_number,
|
||||||
let action = table_data
|
b,
|
||||||
.buffer_table_write(
|
partition_key.clone(),
|
||||||
sequence_number,
|
lifecycle_handle,
|
||||||
b,
|
)
|
||||||
partition_key.clone(),
|
.await?;
|
||||||
lifecycle_handle,
|
if let DmlApplyAction::Applied(should_pause) = action {
|
||||||
)
|
pause_writes = pause_writes || should_pause;
|
||||||
.await?;
|
all_skipped = false;
|
||||||
if let DmlApplyAction::Applied(should_pause) = action {
|
|
||||||
pause_writes = pause_writes || should_pause;
|
|
||||||
all_skipped = false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.test_triggers.on_write().await;
|
self.test_triggers.on_write().await;
|
||||||
}
|
}
|
||||||
|
@ -251,19 +248,13 @@ impl NamespaceData {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the specified [`TableData`] if it exists.
|
/// Return the specified [`TableData`] if it exists.
|
||||||
pub(crate) fn table_data(
|
pub(crate) fn table_data(&self, table_name: &TableName) -> Option<Arc<TableData>> {
|
||||||
&self,
|
|
||||||
table_name: &TableName,
|
|
||||||
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
|
|
||||||
let t = self.tables.read();
|
let t = self.tables.read();
|
||||||
t.by_name(table_name)
|
t.by_name(table_name)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the table data by ID.
|
/// Return the table data by ID.
|
||||||
pub(crate) fn table_id(
|
pub(crate) fn table_id(&self, table_id: TableId) -> Option<Arc<TableData>> {
|
||||||
&self,
|
|
||||||
table_id: TableId,
|
|
||||||
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
|
|
||||||
let t = self.tables.read();
|
let t = self.tables.read();
|
||||||
t.by_id(table_id)
|
t.by_id(table_id)
|
||||||
}
|
}
|
||||||
|
@ -273,7 +264,7 @@ impl NamespaceData {
|
||||||
&self,
|
&self,
|
||||||
table_name: &TableName,
|
table_name: &TableName,
|
||||||
catalog: &Arc<dyn Catalog>,
|
catalog: &Arc<dyn Catalog>,
|
||||||
) -> Result<Arc<tokio::sync::RwLock<TableData>>, super::Error> {
|
) -> Result<Arc<TableData>, super::Error> {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
|
|
||||||
let table_id = repos
|
let table_id = repos
|
||||||
|
@ -317,7 +308,7 @@ impl NamespaceData {
|
||||||
.actively_buffering(*self.buffering_sequence_number.read());
|
.actively_buffering(*self.buffering_sequence_number.read());
|
||||||
|
|
||||||
for table_data in tables {
|
for table_data in tables {
|
||||||
progress = progress.combine(table_data.read().await.progress())
|
progress = progress.combine(table_data.progress())
|
||||||
}
|
}
|
||||||
progress
|
progress
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId
|
||||||
use mutable_batch::MutableBatch;
|
use mutable_batch::MutableBatch;
|
||||||
use observability_deps::tracing::*;
|
use observability_deps::tracing::*;
|
||||||
use schema::sort::SortKey;
|
use schema::sort::SortKey;
|
||||||
|
use thiserror::Error;
|
||||||
use write_summary::ShardProgress;
|
use write_summary::ShardProgress;
|
||||||
|
|
||||||
use self::{
|
use self::{
|
||||||
|
@ -19,6 +20,18 @@ use super::{sequence_range::SequenceNumberRange, table::TableName};
|
||||||
mod buffer;
|
mod buffer;
|
||||||
pub mod resolver;
|
pub mod resolver;
|
||||||
|
|
||||||
|
/// Errors that occur during DML operation buffering.
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub(crate) enum BufferError {
|
||||||
|
/// The op being applied has already been previously persisted.
|
||||||
|
#[error("skipped applying already persisted op")]
|
||||||
|
SkipPersisted,
|
||||||
|
|
||||||
|
/// An error occurred writing the data to the [`MutableBatch`].
|
||||||
|
#[error("failed to apply DML op: {0}")]
|
||||||
|
BufferError(#[from] mutable_batch::Error),
|
||||||
|
}
|
||||||
|
|
||||||
/// The load state of the [`SortKey`] for a given partition.
|
/// The load state of the [`SortKey`] for a given partition.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub(crate) enum SortKeyState {
|
pub(crate) enum SortKeyState {
|
||||||
|
@ -107,25 +120,35 @@ impl PartitionData {
|
||||||
/// Buffer the given [`MutableBatch`] in memory, ordered by the specified
|
/// Buffer the given [`MutableBatch`] in memory, ordered by the specified
|
||||||
/// [`SequenceNumber`].
|
/// [`SequenceNumber`].
|
||||||
///
|
///
|
||||||
|
/// This method returns [`BufferError::SkipPersisted`] if `sequence_number`
|
||||||
|
/// falls in the range of previously persisted data (where `sequence_number`
|
||||||
|
/// is strictly less than the value of
|
||||||
|
/// [`Self::max_persisted_sequence_number()`]).
|
||||||
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
///
|
///
|
||||||
/// This method panics if `sequence_number` is not strictly greater than
|
/// This method panics if `sequence_number` is not strictly greater than
|
||||||
/// previous calls or the persisted maximum.
|
/// previous calls. This is not enforced for writes before the persist mark.
|
||||||
pub(super) fn buffer_write(
|
pub(super) fn buffer_write(
|
||||||
&mut self,
|
&mut self,
|
||||||
mb: MutableBatch,
|
mb: MutableBatch,
|
||||||
sequence_number: SequenceNumber,
|
sequence_number: SequenceNumber,
|
||||||
) -> Result<(), super::Error> {
|
) -> Result<(), BufferError> {
|
||||||
// Ensure that this write is strictly after any persisted ops.
|
// Skip any ops that have already been applied.
|
||||||
if let Some(min) = self.max_persisted_sequence_number {
|
if let Some(min) = self.max_persisted_sequence_number {
|
||||||
assert!(sequence_number > min, "monotonicity violation");
|
if sequence_number <= min {
|
||||||
|
trace!(
|
||||||
|
shard_id=%self.shard_id,
|
||||||
|
op_sequence_number=?sequence_number,
|
||||||
|
"skipping already-persisted write"
|
||||||
|
);
|
||||||
|
return Err(BufferError::SkipPersisted);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Buffer the write, which ensures monotonicity of writes within the
|
// Buffer the write, which ensures monotonicity of writes within the
|
||||||
// buffer itself.
|
// buffer itself.
|
||||||
self.buffer
|
self.buffer.buffer_write(mb, sequence_number)?;
|
||||||
.buffer_write(mb, sequence_number)
|
|
||||||
.map_err(|e| super::Error::BufferWrite { source: e })?;
|
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
shard_id = %self.shard_id,
|
shard_id = %self.shard_id,
|
||||||
|
@ -1081,8 +1104,9 @@ mod tests {
|
||||||
|
|
||||||
// As above, the sequence numbers are not tracked between buffer instances.
|
// As above, the sequence numbers are not tracked between buffer instances.
|
||||||
//
|
//
|
||||||
// This ensures that a write after a batch is persisted is still required to
|
// This test ensures that a partition can tolerate replayed ops prior to the
|
||||||
// be monotonic.
|
// persist marker when first initialising. However once a partition has
|
||||||
|
// buffered beyond the persist marker, it cannot re-buffer ops after it.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[should_panic(expected = "monotonicity violation")]
|
#[should_panic(expected = "monotonicity violation")]
|
||||||
async fn test_non_monotonic_writes_after_persistence() {
|
async fn test_non_monotonic_writes_after_persistence() {
|
||||||
|
@ -1105,13 +1129,26 @@ mod tests {
|
||||||
p.mark_persisted(SequenceNumber::new(42));
|
p.mark_persisted(SequenceNumber::new(42));
|
||||||
|
|
||||||
// This should fail as the write "goes backwards".
|
// This should fail as the write "goes backwards".
|
||||||
p.buffer_write(mb, SequenceNumber::new(1))
|
let err = p
|
||||||
|
.buffer_write(mb.clone(), SequenceNumber::new(1))
|
||||||
|
.expect_err("out of order write should succeed");
|
||||||
|
|
||||||
|
// This assert ensures replay is tolerated, with the previously
|
||||||
|
// persisted ops skipping instead of being applied.
|
||||||
|
assert_matches!(err, BufferError::SkipPersisted);
|
||||||
|
|
||||||
|
// Until a write is accepted.
|
||||||
|
p.buffer_write(mb.clone(), SequenceNumber::new(100))
|
||||||
.expect("out of order write should succeed");
|
.expect("out of order write should succeed");
|
||||||
|
|
||||||
|
// At which point a write between the persist marker and the maximum
|
||||||
|
// applied sequence number is a hard error.
|
||||||
|
let _ = p.buffer_write(mb, SequenceNumber::new(50));
|
||||||
}
|
}
|
||||||
|
|
||||||
// As above, but with a pre-configured persist marker.
|
// As above, but with a pre-configured persist marker greater than the
|
||||||
|
// sequence number being wrote.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[should_panic(expected = "monotonicity violation")]
|
|
||||||
async fn test_non_monotonic_writes_persist_marker() {
|
async fn test_non_monotonic_writes_persist_marker() {
|
||||||
let mut p = PartitionData::new(
|
let mut p = PartitionData::new(
|
||||||
PARTITION_ID,
|
PARTITION_ID,
|
||||||
|
@ -1131,8 +1168,11 @@ mod tests {
|
||||||
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
|
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
|
||||||
|
|
||||||
// This should fail as the write "goes backwards".
|
// This should fail as the write "goes backwards".
|
||||||
p.buffer_write(mb, SequenceNumber::new(1))
|
let err = p
|
||||||
.expect("out of order write should succeed");
|
.buffer_write(mb, SequenceNumber::new(1))
|
||||||
|
.expect_err("out of order write should not succeed");
|
||||||
|
|
||||||
|
assert_matches!(err, BufferError::SkipPersisted);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restoring a persist marker is included in progress reports.
|
// Restoring a persist marker is included in progress reports.
|
||||||
|
|
|
@ -1,48 +1,48 @@
|
||||||
//! Table level data buffer structures.
|
//! Table level data buffer structures.
|
||||||
|
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::sync::Arc;
|
||||||
|
|
||||||
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
|
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
|
||||||
use mutable_batch::MutableBatch;
|
use mutable_batch::MutableBatch;
|
||||||
use observability_deps::tracing::*;
|
use parking_lot::{Mutex, RwLock};
|
||||||
use write_summary::ShardProgress;
|
use write_summary::ShardProgress;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
partition::{resolver::PartitionProvider, PartitionData},
|
partition::{resolver::PartitionProvider, BufferError, PartitionData},
|
||||||
DmlApplyAction,
|
DmlApplyAction,
|
||||||
};
|
};
|
||||||
use crate::lifecycle::LifecycleHandle;
|
use crate::{arcmap::ArcMap, lifecycle::LifecycleHandle};
|
||||||
|
|
||||||
/// A double-referenced map where [`PartitionData`] can be looked up by
|
/// A double-referenced map where [`PartitionData`] can be looked up by
|
||||||
/// [`PartitionKey`], or ID.
|
/// [`PartitionKey`], or ID.
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
struct DoubleRef {
|
struct DoubleRef {
|
||||||
// TODO(4880): this can be removed when IDs are sent over the wire.
|
// TODO(4880): this can be removed when IDs are sent over the wire.
|
||||||
by_key: HashMap<PartitionKey, PartitionData>,
|
by_key: ArcMap<PartitionKey, Mutex<PartitionData>>,
|
||||||
by_id: HashMap<PartitionId, PartitionKey>,
|
by_id: ArcMap<PartitionId, Mutex<PartitionData>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DoubleRef {
|
impl DoubleRef {
|
||||||
fn insert(&mut self, ns: PartitionData) {
|
/// Try to insert the provided [`PartitionData`].
|
||||||
|
///
|
||||||
|
/// Note that the partition MAY have been inserted concurrently, and the
|
||||||
|
/// returned [`PartitionData`] MAY be a different instance for the same
|
||||||
|
/// underlying partition.
|
||||||
|
fn try_insert(&mut self, ns: PartitionData) -> Arc<Mutex<PartitionData>> {
|
||||||
let id = ns.partition_id();
|
let id = ns.partition_id();
|
||||||
let key = ns.partition_key().clone();
|
let key = ns.partition_key().clone();
|
||||||
|
|
||||||
assert!(self.by_key.insert(key.clone(), ns).is_none());
|
let ns = Arc::new(Mutex::new(ns));
|
||||||
assert!(self.by_id.insert(id, key).is_none());
|
self.by_key.get_or_insert_with(&key, || Arc::clone(&ns));
|
||||||
|
self.by_id.get_or_insert_with(&id, || ns)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
fn by_key(&self, key: &PartitionKey) -> Option<Arc<Mutex<PartitionData>>> {
|
||||||
fn by_key(&self, key: &PartitionKey) -> Option<&PartitionData> {
|
|
||||||
self.by_key.get(key)
|
self.by_key.get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn by_key_mut(&mut self, key: &PartitionKey) -> Option<&mut PartitionData> {
|
fn by_id(&self, id: PartitionId) -> Option<Arc<Mutex<PartitionData>>> {
|
||||||
self.by_key.get_mut(key)
|
self.by_id.get(&id)
|
||||||
}
|
|
||||||
|
|
||||||
fn by_id_mut(&mut self, id: PartitionId) -> Option<&mut PartitionData> {
|
|
||||||
let key = self.by_id.get(&id)?.clone();
|
|
||||||
self.by_key_mut(&key)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +103,7 @@ pub(crate) struct TableData {
|
||||||
partition_provider: Arc<dyn PartitionProvider>,
|
partition_provider: Arc<dyn PartitionProvider>,
|
||||||
|
|
||||||
// Map of partition key to its data
|
// Map of partition key to its data
|
||||||
partition_data: DoubleRef,
|
partition_data: RwLock<DoubleRef>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableData {
|
impl TableData {
|
||||||
|
@ -137,13 +137,14 @@ impl TableData {
|
||||||
// buffers the table write and returns true if the lifecycle manager indicates that
|
// buffers the table write and returns true if the lifecycle manager indicates that
|
||||||
// ingest should be paused.
|
// ingest should be paused.
|
||||||
pub(super) async fn buffer_table_write(
|
pub(super) async fn buffer_table_write(
|
||||||
&mut self,
|
&self,
|
||||||
sequence_number: SequenceNumber,
|
sequence_number: SequenceNumber,
|
||||||
batch: MutableBatch,
|
batch: MutableBatch,
|
||||||
partition_key: PartitionKey,
|
partition_key: PartitionKey,
|
||||||
lifecycle_handle: &dyn LifecycleHandle,
|
lifecycle_handle: &dyn LifecycleHandle,
|
||||||
) -> Result<DmlApplyAction, super::Error> {
|
) -> Result<DmlApplyAction, super::Error> {
|
||||||
let partition_data = match self.partition_data.by_key.get_mut(&partition_key) {
|
let p = self.partition_data.read().by_key(&partition_key);
|
||||||
|
let partition_data = match p {
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => {
|
None => {
|
||||||
let p = self
|
let p = self
|
||||||
|
@ -157,26 +158,25 @@ impl TableData {
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
// Add the double-referenced partition to the map.
|
// Add the double-referenced partition to the map.
|
||||||
self.partition_data.insert(p);
|
//
|
||||||
self.partition_data.by_key_mut(&partition_key).unwrap()
|
// This MAY return a different instance than `p` if another
|
||||||
|
// thread has already initialised the partition.
|
||||||
|
self.partition_data.write().try_insert(p)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// skip the write if it has already been persisted
|
|
||||||
if let Some(max) = partition_data.max_persisted_sequence_number() {
|
|
||||||
if max >= sequence_number {
|
|
||||||
trace!(
|
|
||||||
shard_id=%self.shard_id,
|
|
||||||
op_sequence_number=?sequence_number,
|
|
||||||
"skipping already-persisted write"
|
|
||||||
);
|
|
||||||
return Ok(DmlApplyAction::Skipped);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let size = batch.size();
|
let size = batch.size();
|
||||||
let rows = batch.rows();
|
let rows = batch.rows();
|
||||||
partition_data.buffer_write(batch, sequence_number)?;
|
let partition_id = {
|
||||||
|
let mut p = partition_data.lock();
|
||||||
|
match p.buffer_write(batch, sequence_number) {
|
||||||
|
Ok(_) => p.partition_id(),
|
||||||
|
Err(BufferError::SkipPersisted) => return Ok(DmlApplyAction::Skipped),
|
||||||
|
Err(BufferError::BufferError(e)) => {
|
||||||
|
return Err(super::Error::BufferWrite { source: e })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Record the write as having been buffered.
|
// Record the write as having been buffered.
|
||||||
//
|
//
|
||||||
|
@ -184,7 +184,7 @@ impl TableData {
|
||||||
// op may fail which would lead to a write being recorded, but not
|
// op may fail which would lead to a write being recorded, but not
|
||||||
// applied.
|
// applied.
|
||||||
let should_pause = lifecycle_handle.log_write(
|
let should_pause = lifecycle_handle.log_write(
|
||||||
partition_data.partition_id(),
|
partition_id,
|
||||||
self.shard_id,
|
self.shard_id,
|
||||||
self.namespace_id,
|
self.namespace_id,
|
||||||
self.table_id,
|
self.table_id,
|
||||||
|
@ -202,19 +202,17 @@ impl TableData {
|
||||||
///
|
///
|
||||||
/// The order of [`PartitionData`] in the iterator is arbitrary and should
|
/// The order of [`PartitionData`] in the iterator is arbitrary and should
|
||||||
/// not be relied upon.
|
/// not be relied upon.
|
||||||
pub(crate) fn partition_iter_mut(
|
pub(crate) fn partitions(&self) -> Vec<Arc<Mutex<PartitionData>>> {
|
||||||
&mut self,
|
self.partition_data.read().by_key.values()
|
||||||
) -> impl Iterator<Item = &mut PartitionData> + ExactSizeIterator {
|
|
||||||
self.partition_data.by_key.values_mut()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the [`PartitionData`] for the specified ID.
|
/// Return the [`PartitionData`] for the specified ID.
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub(crate) fn get_partition(
|
pub(crate) fn get_partition(
|
||||||
&mut self,
|
&self,
|
||||||
partition_id: PartitionId,
|
partition_id: PartitionId,
|
||||||
) -> Option<&mut PartitionData> {
|
) -> Option<Arc<Mutex<PartitionData>>> {
|
||||||
self.partition_data.by_id_mut(partition_id)
|
self.partition_data.read().by_id(partition_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the [`PartitionData`] for the specified partition key.
|
/// Return the [`PartitionData`] for the specified partition key.
|
||||||
|
@ -222,17 +220,19 @@ impl TableData {
|
||||||
pub(crate) fn get_partition_by_key(
|
pub(crate) fn get_partition_by_key(
|
||||||
&self,
|
&self,
|
||||||
partition_key: &PartitionKey,
|
partition_key: &PartitionKey,
|
||||||
) -> Option<&PartitionData> {
|
) -> Option<Arc<Mutex<PartitionData>>> {
|
||||||
self.partition_data.by_key(partition_key)
|
self.partition_data.read().by_key(partition_key)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return progress from this Table
|
/// Return progress from this Table
|
||||||
pub(super) fn progress(&self) -> ShardProgress {
|
pub(super) fn progress(&self) -> ShardProgress {
|
||||||
self.partition_data
|
self.partition_data
|
||||||
|
.read()
|
||||||
.by_key
|
.by_key
|
||||||
.values()
|
.values()
|
||||||
.fold(Default::default(), |progress, partition_data| {
|
.into_iter()
|
||||||
progress.combine(partition_data.progress())
|
.fold(Default::default(), |progress, p| {
|
||||||
|
progress.combine(p.lock().progress())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,7 +310,7 @@ mod tests {
|
||||||
),
|
),
|
||||||
));
|
));
|
||||||
|
|
||||||
let mut table = TableData::new(
|
let table = TableData::new(
|
||||||
table_id,
|
table_id,
|
||||||
TABLE_NAME.into(),
|
TABLE_NAME.into(),
|
||||||
shard_id,
|
shard_id,
|
||||||
|
@ -324,8 +324,12 @@ mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Assert the table does not contain the test partition
|
// Assert the table does not contain the test partition
|
||||||
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none());
|
assert!(table
|
||||||
assert!(table.partition_data.by_id_mut(PARTITION_ID).is_none());
|
.partition_data
|
||||||
|
.read()
|
||||||
|
.by_key(&PARTITION_KEY.into())
|
||||||
|
.is_none());
|
||||||
|
assert!(table.partition_data.read().by_id(PARTITION_ID).is_none());
|
||||||
|
|
||||||
// Write some test data
|
// Write some test data
|
||||||
let action = table
|
let action = table
|
||||||
|
@ -340,8 +344,12 @@ mod tests {
|
||||||
assert_matches!(action, DmlApplyAction::Applied(false));
|
assert_matches!(action, DmlApplyAction::Applied(false));
|
||||||
|
|
||||||
// Referencing the partition should succeed
|
// Referencing the partition should succeed
|
||||||
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some());
|
assert!(table
|
||||||
assert!(table.partition_data.by_id_mut(PARTITION_ID).is_some());
|
.partition_data
|
||||||
|
.read()
|
||||||
|
.by_key(&PARTITION_KEY.into())
|
||||||
|
.is_some());
|
||||||
|
assert!(table.partition_data.read().by_id(PARTITION_ID).is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -369,7 +377,7 @@ mod tests {
|
||||||
),
|
),
|
||||||
));
|
));
|
||||||
|
|
||||||
let mut table = TableData::new(
|
let table = TableData::new(
|
||||||
table_id,
|
table_id,
|
||||||
TABLE_NAME.into(),
|
TABLE_NAME.into(),
|
||||||
shard_id,
|
shard_id,
|
||||||
|
@ -387,7 +395,11 @@ mod tests {
|
||||||
let handle = MockLifecycleHandle::default();
|
let handle = MockLifecycleHandle::default();
|
||||||
|
|
||||||
// Assert the table does not contain the test partition
|
// Assert the table does not contain the test partition
|
||||||
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none());
|
assert!(table
|
||||||
|
.partition_data
|
||||||
|
.read()
|
||||||
|
.by_key(&PARTITION_KEY.into())
|
||||||
|
.is_none());
|
||||||
|
|
||||||
// Write some test data
|
// Write some test data
|
||||||
let action = table
|
let action = table
|
||||||
|
@ -402,7 +414,11 @@ mod tests {
|
||||||
assert_matches!(action, DmlApplyAction::Applied(false));
|
assert_matches!(action, DmlApplyAction::Applied(false));
|
||||||
|
|
||||||
// Referencing the partition should succeed
|
// Referencing the partition should succeed
|
||||||
assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some());
|
assert!(table
|
||||||
|
.partition_data
|
||||||
|
.read()
|
||||||
|
.by_key(&PARTITION_KEY.into())
|
||||||
|
.is_some());
|
||||||
|
|
||||||
// And the lifecycle handle was called with the expected values
|
// And the lifecycle handle was called with the expected values
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|
|
@ -311,10 +311,11 @@ pub async fn prepare_data_to_querier(
|
||||||
// acquire locks and read table data in parallel
|
// acquire locks and read table data in parallel
|
||||||
let unpersisted_partitions: Vec<_> = futures::stream::iter(table_refs)
|
let unpersisted_partitions: Vec<_> = futures::stream::iter(table_refs)
|
||||||
.map(|table_data| async move {
|
.map(|table_data| async move {
|
||||||
let mut table_data = table_data.write().await;
|
|
||||||
table_data
|
table_data
|
||||||
.partition_iter_mut()
|
.partitions()
|
||||||
|
.into_iter()
|
||||||
.map(|p| {
|
.map(|p| {
|
||||||
|
let mut p = p.lock();
|
||||||
(
|
(
|
||||||
p.partition_id(),
|
p.partition_id(),
|
||||||
p.get_query_data(),
|
p.get_query_data(),
|
||||||
|
|
Loading…
Reference in New Issue