refactor(ingester): Add `data_types` module with `IngestOp` enumeration

The `dml` crate and its contained types simultaneously contain more and
less data than the ingester needs for writes. This type is to replace
the use of `DmlOperation` and `DmlWrite` within the ingester's internals
so that the type can be specialised with low blast-radius changes.

The key change here is to remove the ties to the `DmlMeta` construction
and allow sequencing of data on a per-partition basis
pull/24376/head
Fraser Savage 2023-06-13 10:34:59 +01:00
parent fad11b524f
commit e5719cffff
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
3 changed files with 71 additions and 1 deletions

View File

@ -0,0 +1,63 @@
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use trace::ctx::SpanContext;
/// The set of operations which the ingester can derive and process from wire
/// requests
pub enum IngestOp {
Write(WriteOperation),
}
/// A decoded representation of the data contained by an RPC write
/// represented by an [`IngestOp::Write`]
pub struct WriteOperation {
namespace: NamespaceId,
tables: HashMap<TableId, TableData>,
partition_key: PartitionKey,
span_context: Option<SpanContext>,
}
impl WriteOperation {
/// Construct a new [`WriteOperation`] from the provided details.
///
/// # Panic
///
/// Panics if
///
/// - `tables` is empty
pub fn new(
namespace: NamespaceId,
tables: HashMap<TableId, TableData>,
partition_key: PartitionKey,
span_context: Option<SpanContext>,
) -> Self {
assert_ne!(tables.len(), 0);
Self {
namespace,
tables,
partition_key,
span_context,
}
}
}
/// A container for all data for an individual table as part of a write
/// operation
pub struct TableData {
table: TableId,
/// The partitioned data for `table` in the write. Currently data is
/// partitioned in a way that each table has a single partition of
// data associated with it per write
partitioned_data: PartitionedData,
}
/// Partitioned data belonging to a write, sequenced individually from
/// other [`PartitionedData`]
pub struct PartitionedData {
sequence_number: SequenceNumber,
data: MutableBatch,
}

View File

@ -0,0 +1,6 @@
//! A module containing crate-local data types that are shared across the
//! ingester's internals
mod ingest_op;
#[allow(unused_imports)]
pub(crate) use ingest_op::*;

View File

@ -175,7 +175,7 @@
//! after a faulty range comparison, `{1}` to converge.
//!
//! [`BufferTree`]: crate::buffer_tree::BufferTree
//! [`SequenceNumber`]: data_types::SequenceNumber
//! [`SequenceNumber`]: ::data_types::SequenceNumber
//! [`PersistQueue`]: crate::persist::queue::PersistQueue
//! [`PersistHandle`]: crate::persist::handle::PersistHandle
//! [`IngestState`]: crate::ingest_state::IngestState
@ -240,6 +240,7 @@ pub mod internal_implementation_details {
mod arcmap;
mod buffer_tree;
mod cancellation_safe;
mod data_types;
mod deferred_load;
mod dml_sink;
mod ingest_state;