Merge pull request #6293 from influxdata/dom/remove-ordering

refactor(ingester2): document reordering / remove seqnum ranges
pull/24376/head
kodiakhq[bot] 2022-12-01 12:44:40 +00:00 committed by GitHub
commit 8515b770e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 119 additions and 566 deletions

View File

@ -240,7 +240,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
));

View File

@ -9,9 +9,7 @@ use schema::sort::SortKey;
use self::buffer::{traits::Queryable, BufferState, DataBuffer, Persisting};
use super::table::TableName;
use crate::{
deferred_load::DeferredLoad, query_adaptor::QueryAdaptor, sequence_range::SequenceNumberRange,
};
use crate::{deferred_load::DeferredLoad, query_adaptor::QueryAdaptor};
mod buffer;
pub(crate) mod resolver;
@ -63,15 +61,11 @@ pub(crate) struct PartitionData {
/// / deferred.
table_name: Arc<DeferredLoad<TableName>>,
/// A buffer for incoming writes.
/// A [`DataBuffer`] for incoming writes.
buffer: DataBuffer,
/// The currently persisting [`DataBuffer`], if any.
persisting: Option<BufferState<Persisting>>,
/// The max_persisted_sequence number for any parquet_file in this
/// partition.
max_persisted_sequence_number: Option<SequenceNumber>,
}
impl PartitionData {
@ -84,7 +78,6 @@ impl PartitionData {
table_id: TableId,
table_name: Arc<DeferredLoad<TableName>>,
sort_key: SortKeyState,
max_persisted_sequence_number: Option<SequenceNumber>,
) -> Self {
Self {
partition_id: id,
@ -95,29 +88,16 @@ impl PartitionData {
table_name,
buffer: DataBuffer::default(),
persisting: None,
max_persisted_sequence_number,
}
}
/// Buffer the given [`MutableBatch`] in memory, ordered by the specified
/// [`SequenceNumber`].
///
/// # Panics
///
/// This method panics if `sequence_number` is not strictly greater than
/// previous calls. This is not enforced for writes before the persist mark.
/// Buffer the given [`MutableBatch`] in memory.
pub(super) fn buffer_write(
&mut self,
mb: MutableBatch,
sequence_number: SequenceNumber,
) -> Result<(), mutable_batch::Error> {
// Skip any ops that have already been applied.
if let Some(min) = self.max_persisted_sequence_number {
assert!(sequence_number > min, "monotonicity violation");
}
// Buffer the write, which ensures monotonicity of writes within the
// buffer itself.
// Buffer the write.
self.buffer.buffer_write(mb, sequence_number)?;
trace!(
@ -126,16 +106,14 @@ impl PartitionData {
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
min_sequence_number=?self.buffer.sequence_number_range().inclusive_min(),
max_sequence_number=?self.buffer.sequence_number_range().inclusive_max(),
"buffered write"
);
Ok(())
}
/// Return all data for this partition, ordered by the [`SequenceNumber`]
/// from which it was buffered with.
/// Return all data for this partition, ordered by the calls to
/// [`PartitionData::buffer_write()`].
pub(crate) fn get_query_data(&mut self) -> Option<QueryAdaptor> {
// Extract the buffered data, if any.
let buffered_data = self.buffer.get_query_data();
@ -158,9 +136,6 @@ impl PartitionData {
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
min_sequence_number=?self.buffer.sequence_number_range().inclusive_min(),
max_sequence_number=?self.buffer.sequence_number_range().inclusive_max(),
max_persisted=?self.max_persisted_sequence_number(),
n_batches = data.len(),
"read partition data"
);
@ -178,18 +153,6 @@ impl PartitionData {
Some(QueryAdaptor::new(self.partition_id, data))
}
/// Return the range of [`SequenceNumber`] currently queryable by calling
/// [`PartitionData::get_query_data()`].
///
/// This includes buffered data, snapshots, and currently persisting data.
pub(crate) fn sequence_number_range(&self) -> SequenceNumberRange {
self.persisting
.as_ref()
.map(|v| v.sequence_number_range().clone())
.unwrap_or_default()
.merge(self.buffer.sequence_number_range())
}
/// Snapshot and mark all buffered data as persisting.
///
/// This method returns [`None`] if no data is buffered in [`Self`].
@ -226,9 +189,6 @@ impl PartitionData {
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
current_max_persisted_sequence_number = ?self.max_persisted_sequence_number,
persisting_min_sequence_number = ?persisting.sequence_number_range().inclusive_min(),
persisting_max_sequence_number = ?persisting.sequence_number_range().inclusive_max(),
"marking partition as persisting"
);
@ -238,80 +198,34 @@ impl PartitionData {
Some(QueryAdaptor::new(self.partition_id, data))
}
/// Mark this partition as having completed persistence up to, and
/// including, the specified [`SequenceNumber`].
/// Mark this partition as having completed persistence of the specified
/// `batch`.
///
/// All references to actively persisting are released.
/// All internal references to the data in `batch` are released.
///
/// # Panics
///
/// This method panics if [`Self`] is not marked as undergoing a persist
/// operation. All calls to [`Self::mark_persisted()`] must be preceded by a
/// matching call to [`Self::mark_persisting()`].
pub(crate) fn mark_persisted(&mut self, sequence_number: SequenceNumber) {
// Assert there is a batch marked as persisting in self, that it has a
// non-empty sequence number range, and that the persisted upper bound
// matches the data in the batch being dropped.
//
// TODO: once this has been deployed without issue (the assert does not
// fire), passing the sequence number is redundant and can be removed.
let persisting_max = self
.persisting
.as_ref()
.expect("must be a persisting batch when marking complete")
.sequence_number_range()
.inclusive_max()
.expect("persisting batch must contain sequence numbers");
assert_eq!(
persisting_max, sequence_number,
"marking {:?} as persisted but persisting batch max is {:?}",
sequence_number, persisting_max
/// operation, `batch` is not currently being persisted, or `batch` is
/// persisted out-of-order w.r.t other persisting batches. All calls to
/// [`Self::mark_persisted()`] must be preceded by a matching call to
/// [`Self::mark_persisting()`].
pub(crate) fn mark_persisted(&mut self, _batch: QueryAdaptor) {
// It is an invariant that partitions are persisted in order, as
// queriers consider writes in the object store as being strictly after
// writes returned from an ingester.
assert!(
self.persisting.is_some(),
"must be a persisting batch when marking complete"
);
// Additionally assert the persisting batch is ordered strictly before
// the data in the buffer, if any.
//
// This asserts writes are monotonically applied.
if let Some(buffer_min) = self.buffer.sequence_number_range().inclusive_min() {
assert!(persisting_max < buffer_min, "monotonicity violation");
}
// It is an invariant that partitions are persisted in order so that
// both the per-partition watermarks are correctly advanced and
// accurate.
if let Some(last_persist) = self.max_persisted_sequence_number() {
assert!(
sequence_number > last_persist,
"out of order partition persistence, persisting {}, previously persisted {}",
sequence_number.get(),
last_persist.get(),
);
}
self.max_persisted_sequence_number = Some(sequence_number);
self.persisting = None;
debug!(
namespace_id = %self.namespace_id,
table_id = %self.table_id,
table_name = %self.table_name,
partition_id = %self.partition_id,
partition_key = %self.partition_key,
current_max_persisted_sequence_number = ?self.max_persisted_sequence_number,
"marking partition persistence complete"
);
}
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
}
/// Return the [`SequenceNumber`] that forms the (inclusive) persistence
/// watermark for this partition.
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
self.max_persisted_sequence_number
}
/// Return the name of the table this [`PartitionData`] is buffering writes
/// for.
pub(crate) fn table_name(&self) -> &Arc<DeferredLoad<TableName>> {
@ -392,16 +306,8 @@ mod tests {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
// No writes should report no sequence offsets.
{
let range = p.sequence_number_range();
assert_eq!(range.inclusive_min(), None);
assert_eq!(range.inclusive_max(), None);
}
// And no data should be returned when queried.
assert!(p.get_query_data().is_none());
@ -410,13 +316,6 @@ mod tests {
p.buffer_write(mb, SequenceNumber::new(1))
.expect("write should succeed");
// The sequence range should now cover the single write.
{
let range = p.sequence_number_range();
assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(1)));
assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(1)));
}
// The data should be readable.
{
let data = p.get_query_data().expect("should return data");
@ -446,13 +345,6 @@ mod tests {
p.buffer_write(mb, SequenceNumber::new(2))
.expect("write should succeed");
// The sequence range should now cover both writes.
{
let range = p.sequence_number_range();
assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(1)));
assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(2)));
}
// And finally both writes should be readable.
{
let data = p.get_query_data().expect("should contain data");
@ -491,10 +383,8 @@ mod tests {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
assert!(p.max_persisted_sequence_number().is_none());
assert!(p.get_query_data().is_none());
// Perform a single write.
@ -524,16 +414,6 @@ mod tests {
.collect::<Vec<_>>()
);
// The sequence range should now cover the single persisting write.
{
let range = p.sequence_number_range();
assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(1)));
assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(1)));
}
// And the max_persisted_sequence_number should not have changed.
assert!(p.max_persisted_sequence_number().is_none());
// Buffer another write during an ongoing persist.
let mb = lp_to_mutable_batch(r#"bananas,city=Madrid people=4,pigeons="none" 20"#).1;
p.buffer_write(mb, SequenceNumber::new(2))
@ -563,33 +443,8 @@ mod tests {
);
}
// The sequence range should still cover both writes.
{
let range = p.sequence_number_range();
assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(1)));
assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(2)));
}
// And the max_persisted_sequence_number should not have changed.
assert!(p.max_persisted_sequence_number().is_none());
// The persist now "completes".
p.mark_persisted(SequenceNumber::new(1));
// The sequence range should now cover only the second remaining
// buffered write.
{
let range = p.sequence_number_range();
assert_eq!(range.inclusive_min(), Some(SequenceNumber::new(2)));
assert_eq!(range.inclusive_max(), Some(SequenceNumber::new(2)));
}
// And the max_persisted_sequence_number should reflect the completed
// persist op.
assert_eq!(
p.max_persisted_sequence_number(),
Some(SequenceNumber::new(1))
);
p.mark_persisted(persisting_data);
// Querying the buffer should now return only the second write.
{
@ -658,7 +513,6 @@ mod tests {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
// Perform the initial write.
@ -701,21 +555,20 @@ mod tests {
.await;
// Begin persisting the data, moving the buffer to the persisting state.
{
let batches = p.mark_persisting().unwrap();
assert_eq!(batches.record_batches().len(), 1);
assert_deduped(
&[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000042Z | 2 |",
"+--------------------------------+---+",
],
batches,
)
.await;
}
let persisting_data = p.mark_persisting().unwrap();
assert_eq!(persisting_data.record_batches().len(), 1);
assert_deduped(
&[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000042Z | 2 |",
"+--------------------------------+---+",
],
persisting_data.clone(),
)
.await;
// Buffer another write, and generate a snapshot by querying it.
let mb = lp_to_mutable_batch(r#"bananas x=3 42"#).1;
@ -736,11 +589,7 @@ mod tests {
.await;
// Finish persisting.
p.mark_persisted(SequenceNumber::new(2));
assert_eq!(
p.max_persisted_sequence_number(),
Some(SequenceNumber::new(2))
);
p.mark_persisted(persisting_data);
// And assert the correct value remains.
assert_eq!(p.get_query_data().unwrap().record_batches().len(), 1);
@ -772,7 +621,6 @@ mod tests {
TableName::from("platanos")
})),
starting_state,
None,
);
let want = Some(SortKey::from_columns(["banana", "platanos", "time"]));
@ -829,7 +677,6 @@ mod tests {
TableName::from("platanos")
})),
starting_state,
None,
);
let want = Some(SortKey::from_columns(["banana", "platanos", "time"]));
@ -841,7 +688,6 @@ mod tests {
// Perform writes with non-monotonic sequence numbers.
#[tokio::test]
#[should_panic(expected = "monotonicity violation")]
async fn test_non_monotonic_writes() {
let mut p = PartitionData::new(
PARTITION_ID,
@ -852,14 +698,37 @@ mod tests {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
// Perform out of order writes.
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
p.buffer_write(mb.clone(), SequenceNumber::new(2))
.expect("write should succeed");
let _ = p.buffer_write(mb, SequenceNumber::new(1));
p.buffer_write(
lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1,
SequenceNumber::new(2),
)
.expect("write should succeed");
let _ = p.buffer_write(
lp_to_mutable_batch(r#"bananas,city=Madrid people=2,pigeons="none" 11"#).1,
SequenceNumber::new(1),
);
// Nothing should explode, data should be readable.
let data = p.get_query_data().unwrap();
assert_batches_eq!(
[
"+--------+--------+----------+--------------------------------+",
"| city | people | pigeons | time |",
"+--------+--------+----------+--------------------------------+",
"| London | 2 | millions | 1970-01-01T00:00:00.000000010Z |",
"| Madrid | 2 | none | 1970-01-01T00:00:00.000000011Z |",
"+--------+--------+----------+--------------------------------+",
],
&*data
.record_batches()
.iter()
.map(Deref::deref)
.cloned()
.collect::<Vec<_>>()
);
}
#[tokio::test]
@ -874,10 +743,16 @@ mod tests {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
p.mark_persisted(SequenceNumber::new(1));
// Write some data
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
p.buffer_write(mb, SequenceNumber::new(2))
.expect("write should succeed");
let not_persisting = p.get_query_data().unwrap();
p.mark_persisted(not_persisting);
}
#[tokio::test]
@ -891,7 +766,6 @@ mod tests {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
assert!(p.mark_persisting().is_none());
@ -909,7 +783,6 @@ mod tests {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
@ -921,129 +794,6 @@ mod tests {
p.mark_persisting();
}
#[tokio::test]
#[should_panic(
expected = "marking SequenceNumber(42) as persisted but persisting batch max is SequenceNumber(2)"
)]
async fn test_mark_persisted_wrong_sequence_number() {
let mut p = PartitionData::new(
PARTITION_ID,
PARTITION_KEY.clone(),
NamespaceId::new(3),
TableId::new(4),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
p.buffer_write(mb, SequenceNumber::new(2))
.expect("write should succeed");
assert!(p.mark_persisting().is_some());
p.mark_persisted(SequenceNumber::new(42));
}
// Because persisting moves the data out of the "hot" buffer, the sequence
// numbers are not validated as being monotonic (the new buffer has no
// sequence numbers to compare against).
//
// Instead this check is performed when marking the persist op as complete.
#[tokio::test]
#[should_panic(expected = "monotonicity violation")]
async fn test_non_monotonic_writes_with_persistence() {
let mut p = PartitionData::new(
PARTITION_ID,
PARTITION_KEY.clone(),
NamespaceId::new(3),
TableId::new(4),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
p.buffer_write(mb.clone(), SequenceNumber::new(42))
.expect("write should succeed");
assert!(p.mark_persisting().is_some());
// This succeeds due to a new buffer being in place that cannot track
// previous sequence numbers.
p.buffer_write(mb, SequenceNumber::new(1))
.expect("out of order write should succeed");
// The assert on non-monotonic writes moves to here instead.
p.mark_persisted(SequenceNumber::new(42));
}
// As above, the sequence numbers are not tracked between buffer instances.
//
// This test ensures that a partition can tolerate replayed ops prior to the
// persist marker when first initialising. However once a partition has
// buffered beyond the persist marker, it cannot re-buffer ops after it.
#[tokio::test]
#[should_panic(expected = "monotonicity violation")]
async fn test_non_monotonic_writes_after_persistence() {
let mut p = PartitionData::new(
PARTITION_ID,
PARTITION_KEY.clone(),
NamespaceId::new(3),
TableId::new(4),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
p.buffer_write(mb.clone(), SequenceNumber::new(42))
.expect("write should succeed");
assert!(p.mark_persisting().is_some());
p.mark_persisted(SequenceNumber::new(42));
// This should fail as the write "goes backwards".
let _err = p
.buffer_write(mb, SequenceNumber::new(1))
.expect_err("out of order write should succeed");
}
// As above, but with a pre-configured persist marker greater than the
// sequence number being wrote.
#[tokio::test]
#[should_panic(expected = "monotonicity violation")]
async fn test_non_monotonic_writes_persist_marker() {
let mut p = PartitionData::new(
PARTITION_ID,
PARTITION_KEY.clone(),
NamespaceId::new(3),
TableId::new(4),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
Some(SequenceNumber::new(42)),
);
assert_eq!(
p.max_persisted_sequence_number(),
Some(SequenceNumber::new(42))
);
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
// This should fail as the write "goes backwards".
let _err = p
.buffer_write(mb, SequenceNumber::new(1))
.expect_err("out of order write should not succeed");
}
// Ensure an empty PartitionData does not panic due to constructing an empty
// QueryAdaptor.
#[tokio::test]
@ -1057,7 +807,6 @@ mod tests {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
Some(SequenceNumber::new(42)),
);
assert!(p.get_query_data().is_none());

View File

@ -12,7 +12,6 @@ pub(crate) mod traits;
pub(crate) use state_machine::*;
use self::{always_some::AlwaysSome, traits::Queryable};
use crate::sequence_range::SequenceNumberRange;
/// The current state of the [`BufferState`] state machine.
///
@ -32,16 +31,6 @@ impl Default for FsmState {
}
}
impl FsmState {
/// Return the current range of writes in the [`BufferState`] state machine,
/// if any.
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
match self {
Self::Buffering(v) => v.sequence_number_range(),
}
}
}
/// A helper wrapper over the [`BufferState`] FSM to abstract the caller from
/// state transitions during reads and writes from the underlying buffer.
#[derive(Debug, Default)]
@ -49,19 +38,8 @@ impl FsmState {
pub(crate) struct DataBuffer(AlwaysSome<FsmState>);
impl DataBuffer {
/// Return the range of [`SequenceNumber`] currently queryable by calling
/// [`Self::get_query_data()`].
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
self.0.sequence_number_range()
}
/// Buffer the given [`MutableBatch`] in memory, ordered by the specified
/// [`SequenceNumber`].
///
/// # Panics
///
/// This method panics if `sequence_number` is not strictly greater than
/// previous calls.
pub(crate) fn buffer_write(
&mut self,
mb: MutableBatch,

View File

@ -13,7 +13,6 @@ pub(in crate::buffer_tree::partition::buffer) use buffering::*;
pub(crate) use persisting::*;
use super::traits::{Queryable, Writeable};
use crate::sequence_range::SequenceNumberRange;
/// A result type for fallible transitions.
///
@ -30,11 +29,8 @@ pub(crate) enum Transition<A, B> {
impl<A, B> Transition<A, B> {
/// A helper function to construct [`Self::Ok`] variants.
pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Self {
Self::Ok(BufferState {
state: v,
sequence_range,
})
pub(super) fn ok(v: A) -> Self {
Self::Ok(BufferState { state: v })
}
/// A helper function to construct [`Self::Unchanged`] variants.
@ -71,7 +67,6 @@ impl<A, B> Transition<A, B> {
#[derive(Debug)]
pub(crate) struct BufferState<T> {
state: T,
sequence_range: SequenceNumberRange,
}
impl BufferState<Buffering> {
@ -79,17 +74,10 @@ impl BufferState<Buffering> {
pub(super) fn new() -> Self {
Self {
state: Buffering::default(),
sequence_range: SequenceNumberRange::default(),
}
}
}
impl<T> BufferState<T> {
pub(crate) fn sequence_number_range(&self) -> &SequenceNumberRange {
&self.sequence_range
}
}
/// A [`BufferState`] in a mutable state can accept writes and record their
/// [`SequenceNumber`].
impl<T> BufferState<T>
@ -105,10 +93,9 @@ where
pub(crate) fn write(
&mut self,
batch: MutableBatch,
n: SequenceNumber,
_n: SequenceNumber,
) -> Result<(), mutable_batch::Error> {
self.state.write(batch)?;
self.sequence_range.observe(n);
Ok(())
}
}
@ -143,10 +130,6 @@ mod tests {
// Initialise a buffer in the base state.
let mut buffer: BufferState<Buffering> = BufferState::new();
// Validate the sequence number ranges are not populated.
assert!(buffer.sequence_number_range().inclusive_min().is_none());
assert!(buffer.sequence_number_range().inclusive_max().is_none());
// Write some data to a buffer.
buffer
.write(
@ -220,16 +203,6 @@ mod tests {
// Finally transition into the terminal persisting state.
let buffer: BufferState<Persisting> = buffer.into_persisting();
// Validate the sequence number ranges were updated as writes occurred.
assert_eq!(
buffer.sequence_number_range().inclusive_min(),
Some(SequenceNumber::new(0))
);
assert_eq!(
buffer.sequence_number_range().inclusive_max(),
Some(SequenceNumber::new(1))
);
// Extract the final buffered result
let final_data = buffer.into_data();

View File

@ -74,7 +74,7 @@ impl BufferState<Buffering> {
.expect("snapshot of non-empty buffer should succeed");
// And transition to the WithSnapshot state.
Transition::ok(Snapshot::new(vec![snap]), self.sequence_range)
Transition::ok(Snapshot::new(vec![snap]))
}
}

View File

@ -36,7 +36,6 @@ impl BufferState<Snapshot> {
assert!(!self.state.snapshots.is_empty());
BufferState {
state: Persisting::new(self.state.snapshots),
sequence_range: self.sequence_range,
}
}
}

View File

@ -16,13 +16,6 @@ use crate::{
deferred_load::DeferredLoad,
};
/// The data-carrying value of a `(table_id, partition_key)` lookup.
#[derive(Debug)]
struct Entry {
partition_id: PartitionId,
max_sequence_number: Option<SequenceNumber>,
}
/// A read-through cache mapping `(table_id, partition_key)` tuples to
/// `(partition_id, max_sequence_number)`.
///
@ -39,10 +32,9 @@ struct Entry {
/// - Partition key: String (8 len + 8 cap + 8 ptr + data len) = 34 bytes
/// - TableId: 8 bytes
/// - PartitionId: 8 bytes
/// - Optional sequence number: 16 bytes
///
/// For a total of 66 bytes per entry - approx 15,887 entries can be held in 1MB
/// of memory.
/// For a total of 50 bytes per entry - approx 20,971 entries can be held in
/// 1MiB of memory.
///
/// Each cache hit _removes_ the entry from the cache - this eliminates the
/// memory overhead for items that were hit. This is the expected (only valid!)
@ -74,7 +66,7 @@ pub(crate) struct PartitionCache<T> {
/// It's also likely a smaller N (more tables than partition keys) making it
/// a faster search for cache misses.
#[allow(clippy::type_complexity)]
entries: Mutex<HashMap<PartitionKey, HashMap<TableId, Entry>>>,
entries: Mutex<HashMap<PartitionKey, HashMap<TableId, PartitionId>>>,
/// Data needed to construct the [`SortKeyResolver`] for cached entries.
catalog: Arc<dyn Catalog>,
@ -103,15 +95,12 @@ impl<T> PartitionCache<T> {
where
P: IntoIterator<Item = Partition>,
{
let mut entries = HashMap::<PartitionKey, HashMap<TableId, Entry>>::new();
let mut entries = HashMap::<PartitionKey, HashMap<TableId, PartitionId>>::new();
for p in partitions.into_iter() {
entries.entry(p.partition_key).or_default().insert(
p.table_id,
Entry {
partition_id: p.id,
max_sequence_number: p.persisted_sequence_number,
},
);
entries
.entry(p.partition_key)
.or_default()
.insert(p.table_id, p.id);
}
// Minimise the overhead of the maps.
@ -135,7 +124,7 @@ impl<T> PartitionCache<T> {
&self,
table_id: TableId,
partition_key: &PartitionKey,
) -> Option<(PartitionKey, Entry)> {
) -> Option<(PartitionKey, PartitionId)> {
let mut entries = self.entries.lock();
// Look up the partition key provided by the caller.
@ -177,14 +166,14 @@ where
// Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache.
if let Some((key, cached)) = self.find(table_id, &partition_key) {
if let Some((key, partition_id)) = self.find(table_id, &partition_key) {
debug!(%table_id, %partition_key, "partition cache hit");
// Initialise a deferred resolver for the sort key.
let sort_key_resolver = DeferredLoad::new(
self.max_smear,
SortKeyResolver::new(
cached.partition_id,
partition_id,
Arc::clone(&__self.catalog),
self.backoff_config.clone(),
)
@ -195,13 +184,12 @@ where
// allows the backing str memory to be reused across all partitions
// using the same key!
return PartitionData::new(
cached.partition_id,
partition_id,
key,
namespace_id,
table_id,
table_name,
SortKeyState::Deferred(Arc::new(sort_key_resolver)),
cached.max_sequence_number,
);
}
@ -256,7 +244,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
);
let inner = MockPartitionProvider::default().with_partition(data);
@ -336,7 +323,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
));
let partition = Partition {
@ -377,7 +363,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
));
let partition = Partition {

View File

@ -84,7 +84,6 @@ impl PartitionProvider for CatalogPartitionResolver {
table_id,
table_name,
SortKeyState::Provided(p.sort_key()),
p.persisted_sequence_number,
)
}
}
@ -154,7 +153,6 @@ mod tests {
assert_eq!(got.namespace_id(), namespace_id);
assert_eq!(got.table_name().to_string(), table_name.to_string());
assert_matches!(got.sort_key(), SortKeyState::Provided(None));
assert_eq!(got.max_persisted_sequence_number(), None);
assert!(got.partition_key.ptr_eq(&callers_partition_key));
let got = catalog

View File

@ -69,7 +69,6 @@ mod tests {
table_id,
Arc::clone(&table_name),
SortKeyState::Provided(None),
None,
);
let mock = Arc::new(MockPartitionProvider::default().with_partition(data));

View File

@ -216,7 +216,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
));
@ -336,7 +335,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)],
writes = [make_write_op(
&PartitionKey::from("p1"),
@ -369,7 +367,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
PartitionData::new(
PartitionId::new(1),
@ -380,7 +377,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)
],
writes = [
@ -425,7 +421,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
PartitionData::new(
PartitionId::new(1),
@ -436,7 +431,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)
],
writes = [
@ -480,7 +474,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
PartitionData::new(
PartitionId::new(1),
@ -491,7 +484,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)
],
writes = [
@ -536,7 +528,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)],
writes = [
make_write_op(
@ -583,7 +574,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
))
.with_partition(PartitionData::new(
PartitionId::new(0),
@ -594,7 +584,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)),
);
@ -668,7 +657,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
));
@ -748,7 +736,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
))
.with_partition(PartitionData::new(
PartitionId::new(1),
@ -759,7 +746,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
)),
);

View File

@ -293,7 +293,6 @@ mod tests {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
));

View File

@ -1,4 +1,34 @@
//! IOx Ingester V2 implementation.
//!
//! ## Write Reordering
//!
//! A write that enters an `ingester2` instance can be reordered arbitrarily
//! with concurrent write requests.
//!
//! For example, two gRPC writes can race to be committed to the WAL, and then
//! race again to be buffered into the [`BufferTree`]. Writes to a
//! [`BufferTree`] may arrive out-of-order w.r.t their assigned sequence
//! numbers.
//!
//! This can also lead to the ordering of entries in the [`wal`] diverging from
//! the order of ops applied to the [`BufferTree`] (see
//! <https://github.com/influxdata/influxdb_iox/issues/6276>).
//!
//! This requires careful management, but ultimately allows for high levels of
//! parallelism when handling both writes and queries, increasing the
//! performance of both.
//!
//! Because of re-ordering, ranges of [`SequenceNumber`] cannot be used to
//! indirectly equality match (nor prefix match) the underlying data;
//! non-monotonic writes means overlapping ranges do not guarantee equality of
//! the set of operations they represent (gaps may be present). For example, a
//! range of sequence numbers bounded by `[0, 2)` for thread 1 may not contain
//! the same op data as another thread T2 with range with the same bounds due to
//! reordering causing T1 to observe `{0, 1, 2}` and T2 observing `{0, 2}` and
//! after a faulty range comparison, `{1}` to converge.
//!
//! [`BufferTree`]: crate::buffer_tree::BufferTree
//! [`SequenceNumber`]: data_types::SequenceNumber
#![allow(dead_code)] // Until ingester2 is used.
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
@ -47,7 +77,6 @@ mod deferred_load;
mod dml_sink;
mod query;
mod query_adaptor;
mod sequence_range;
pub(crate) mod server;
mod timestamp_oracle;
mod wal;

View File

@ -1,141 +0,0 @@
use data_types::SequenceNumber;
/// A range of sequence numbers, both inclusive [min, max].
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub(crate) struct SequenceNumberRange {
range: Option<(SequenceNumber, SequenceNumber)>,
}
impl SequenceNumberRange {
pub(crate) fn observe(&mut self, n: SequenceNumber) {
self.range = Some(match self.range {
Some((min, max)) => {
assert!(n > max, "monotonicity violation");
(min, n)
}
None => (n, n),
});
}
/// Returns the inclusive lower bound on [`SequenceNumber`] values observed.
pub(crate) fn inclusive_min(&self) -> Option<SequenceNumber> {
self.range.map(|v| v.0)
}
/// Returns the inclusive upper bound on [`SequenceNumber`] values observed.
pub(crate) fn inclusive_max(&self) -> Option<SequenceNumber> {
self.range.map(|v| v.1)
}
/// Merge two [`SequenceNumberRange`] instances, returning a new, merged
/// instance.
///
/// The merge result contains the minimum of [`Self::inclusive_min()`] from
/// each instance, and the maximum of [`Self::inclusive_max()`].
///
/// If both `self` and `other` contain no [`SequenceNumber`] observations,
/// the returned instance contains no observations.
pub(crate) fn merge(&self, other: &Self) -> Self {
let merged_range = self
.range
.into_iter()
.chain(other.range)
.reduce(|a, b| (a.0.min(b.0), a.1.max(b.1)));
Self {
range: merged_range,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ranges() {
let mut r = SequenceNumberRange::default();
r.observe(SequenceNumber::new(0));
r.observe(SequenceNumber::new(2));
r.observe(SequenceNumber::new(3));
assert_eq!(r.inclusive_min(), Some(SequenceNumber::new(0)));
assert_eq!(r.inclusive_max(), Some(SequenceNumber::new(3)));
}
#[test]
#[should_panic = "monotonicity violation"]
fn test_monotonicity() {
let mut r = SequenceNumberRange::default();
r.observe(SequenceNumber::new(1));
r.observe(SequenceNumber::new(3));
r.observe(SequenceNumber::new(2));
}
#[test]
#[should_panic = "monotonicity violation"]
fn test_exactly_once() {
let mut r = SequenceNumberRange::default();
r.observe(SequenceNumber::new(1));
r.observe(SequenceNumber::new(1));
}
#[test]
fn test_merge() {
let mut a = SequenceNumberRange::default();
let mut b = SequenceNumberRange::default();
a.observe(SequenceNumber::new(4));
b.observe(SequenceNumber::new(2));
let a_b = a.merge(&b);
assert_eq!(a_b.inclusive_min(), Some(SequenceNumber::new(2)));
assert_eq!(a_b.inclusive_max(), Some(SequenceNumber::new(4)));
let b_a = b.merge(&a);
assert_eq!(b_a.inclusive_min(), Some(SequenceNumber::new(2)));
assert_eq!(b_a.inclusive_max(), Some(SequenceNumber::new(4)));
assert_eq!(a_b, b_a);
}
#[test]
fn test_merge_half_empty() {
let mut a = SequenceNumberRange::default();
let b = SequenceNumberRange::default();
a.observe(SequenceNumber::new(4));
// B observes nothing
let a_b = a.merge(&b);
assert_eq!(a_b.inclusive_min(), Some(SequenceNumber::new(4)));
assert_eq!(a_b.inclusive_max(), Some(SequenceNumber::new(4)));
let b_a = b.merge(&a);
assert_eq!(b_a.inclusive_min(), Some(SequenceNumber::new(4)));
assert_eq!(b_a.inclusive_max(), Some(SequenceNumber::new(4)));
assert_eq!(a_b, b_a);
}
#[test]
fn test_merge_both_empty() {
let a = SequenceNumberRange::default();
let b = SequenceNumberRange::default();
// Neither observe anything
let a_b = a.merge(&b);
assert_eq!(a_b.inclusive_min(), None);
assert_eq!(a_b.inclusive_max(), None);
let b_a = b.merge(&a);
assert_eq!(b_a.inclusive_min(), None);
assert_eq!(b_a.inclusive_max(), None);
assert_eq!(a_b, b_a);
}
}