From e5719cffff4b4c79be9fb0528f39fe52336b3ce2 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Tue, 13 Jun 2023 10:34:59 +0100 Subject: [PATCH 1/2] refactor(ingester): Add `data_types` module with `IngestOp` enumeration The `dml` crate and its contained types simultaneously contain more and less data than the ingester needs for writes. This type is to replace the use of `DmlOperation` and `DmlWrite` within the ingester's internals so that the type can be specialised with low blast-radius changes. The key change here is to remove the ties to the `DmlMeta` construction and allow sequencing of data on a per-partition basis --- ingester/src/data_types/ingest_op.rs | 63 ++++++++++++++++++++++++++++ ingester/src/data_types/mod.rs | 6 +++ ingester/src/lib.rs | 3 +- 3 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 ingester/src/data_types/ingest_op.rs create mode 100644 ingester/src/data_types/mod.rs diff --git a/ingester/src/data_types/ingest_op.rs b/ingester/src/data_types/ingest_op.rs new file mode 100644 index 0000000000..0fdce56110 --- /dev/null +++ b/ingester/src/data_types/ingest_op.rs @@ -0,0 +1,63 @@ +use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId}; +use hashbrown::HashMap; +use mutable_batch::MutableBatch; +use trace::ctx::SpanContext; + +/// The set of operations which the ingester can derive and process from wire +/// requests +pub enum IngestOp { + Write(WriteOperation), +} + +/// A decoded representation of the data contained by an RPC write +/// represented by an [`IngestOp::Write`] +pub struct WriteOperation { + namespace: NamespaceId, + + tables: HashMap, + partition_key: PartitionKey, + + span_context: Option, +} + +impl WriteOperation { + /// Construct a new [`WriteOperation`] from the provided details. + /// + /// # Panic + /// + /// Panics if + /// + /// - `tables` is empty + pub fn new( + namespace: NamespaceId, + tables: HashMap, + partition_key: PartitionKey, + span_context: Option, + ) -> Self { + assert_ne!(tables.len(), 0); + + Self { + namespace, + tables, + partition_key, + span_context, + } + } +} + +/// A container for all data for an individual table as part of a write +/// operation +pub struct TableData { + table: TableId, + /// The partitioned data for `table` in the write. Currently data is + /// partitioned in a way that each table has a single partition of + // data associated with it per write + partitioned_data: PartitionedData, +} + +/// Partitioned data belonging to a write, sequenced individually from +/// other [`PartitionedData`] +pub struct PartitionedData { + sequence_number: SequenceNumber, + data: MutableBatch, +} diff --git a/ingester/src/data_types/mod.rs b/ingester/src/data_types/mod.rs new file mode 100644 index 0000000000..d6e37b39cb --- /dev/null +++ b/ingester/src/data_types/mod.rs @@ -0,0 +1,6 @@ +//! A module containing crate-local data types that are shared across the +//! ingester's internals + +mod ingest_op; +#[allow(unused_imports)] +pub(crate) use ingest_op::*; diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 6fb7e939bf..4dbd26100c 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -175,7 +175,7 @@ //! after a faulty range comparison, `{1}` to converge. //! //! [`BufferTree`]: crate::buffer_tree::BufferTree -//! [`SequenceNumber`]: data_types::SequenceNumber +//! [`SequenceNumber`]: ::data_types::SequenceNumber //! [`PersistQueue`]: crate::persist::queue::PersistQueue //! [`PersistHandle`]: crate::persist::handle::PersistHandle //! [`IngestState`]: crate::ingest_state::IngestState @@ -240,6 +240,7 @@ pub mod internal_implementation_details { mod arcmap; mod buffer_tree; mod cancellation_safe; +mod data_types; mod deferred_load; mod dml_sink; mod ingest_state; From 4909d4122fb1f772b4ac01fbcae60ce76bdc61ee Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Tue, 13 Jun 2023 14:23:37 +0100 Subject: [PATCH 2/2] refactor(ingester): Rename `data_types` module to `dml_payload` --- ingester/src/{data_types => dml_payload}/ingest_op.rs | 0 ingester/src/{data_types => dml_payload}/mod.rs | 0 ingester/src/lib.rs | 4 ++-- 3 files changed, 2 insertions(+), 2 deletions(-) rename ingester/src/{data_types => dml_payload}/ingest_op.rs (100%) rename ingester/src/{data_types => dml_payload}/mod.rs (100%) diff --git a/ingester/src/data_types/ingest_op.rs b/ingester/src/dml_payload/ingest_op.rs similarity index 100% rename from ingester/src/data_types/ingest_op.rs rename to ingester/src/dml_payload/ingest_op.rs diff --git a/ingester/src/data_types/mod.rs b/ingester/src/dml_payload/mod.rs similarity index 100% rename from ingester/src/data_types/mod.rs rename to ingester/src/dml_payload/mod.rs diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 4dbd26100c..6719078fd6 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -175,7 +175,7 @@ //! after a faulty range comparison, `{1}` to converge. //! //! [`BufferTree`]: crate::buffer_tree::BufferTree -//! [`SequenceNumber`]: ::data_types::SequenceNumber +//! [`SequenceNumber`]: data_types::SequenceNumber //! [`PersistQueue`]: crate::persist::queue::PersistQueue //! [`PersistHandle`]: crate::persist::handle::PersistHandle //! [`IngestState`]: crate::ingest_state::IngestState @@ -240,8 +240,8 @@ pub mod internal_implementation_details { mod arcmap; mod buffer_tree; mod cancellation_safe; -mod data_types; mod deferred_load; +mod dml_payload; mod dml_sink; mod ingest_state; mod ingester_id;