refactor(ingester): Implement `From<DmlOperation>` for `IngestOp`

This commit implements the `From` trait to allow quick conversion from
`DmlOperation::DmlWrite` to `IngestOp::WriteOperation`. This conversion
performs some copies and should be removed once the RPC write path has
been switched to use `IngestOp`.
pull/24376/head
Fraser Savage 2023-06-15 16:27:23 +01:00
parent 1e22d7a25c
commit 5ca7bd58f4
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
1 changed files with 66 additions and 0 deletions

View File

@ -1,4 +1,5 @@
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
use dml::{DmlOperation, DmlWrite};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use trace::ctx::SpanContext;
@ -9,6 +10,17 @@ pub enum IngestOp {
Write(WriteOperation),
}
impl From<DmlOperation> for IngestOp {
fn from(value: DmlOperation) -> Self {
match value {
DmlOperation::Write(w) => Self::Write(WriteOperation::from(w)),
DmlOperation::Delete(_) => {
panic!("no corresponding ingest operation exists for DML delete")
}
}
}
}
/// A decoded representation of the data contained by an RPC write
/// represented by an [`IngestOp::Write`]
pub struct WriteOperation {
@ -45,6 +57,42 @@ impl WriteOperation {
}
}
// TODO(savage): Temporary [`From`] implementation to assist in switchover
// within ingester code. This should be removed in favour of constructing all
// [`WriteOperation`]s directly
impl From<DmlWrite> for WriteOperation {
fn from(dml_write: DmlWrite) -> Self {
let namespace_id = dml_write.namespace_id();
let partition_key = dml_write.partition_key().to_owned();
let sequence_number = dml_write
.meta()
.sequence()
.expect("tried to create write operation from unsequenced DML write");
let span_context = dml_write.meta().span_context().map(SpanContext::to_owned);
Self::new(
namespace_id,
dml_write
.into_tables()
.map(|(table, data)| {
(
table,
TableData {
table,
partitioned_data: PartitionedData {
sequence_number,
data,
},
},
)
})
.collect(),
partition_key,
span_context,
)
}
}
/// A container for all data for an individual table as part of a write
/// operation
pub struct TableData {
@ -55,9 +103,27 @@ pub struct TableData {
partitioned_data: PartitionedData,
}
impl TableData {
pub fn new(table: TableId, partitioned_data: PartitionedData) -> Self {
Self {
table,
partitioned_data,
}
}
}
/// Partitioned data belonging to a write, sequenced individually from
/// other [`PartitionedData`]
pub struct PartitionedData {
sequence_number: SequenceNumber,
data: MutableBatch,
}
impl PartitionedData {
pub fn new(sequence_number: SequenceNumber, data: MutableBatch) -> Self {
Self {
sequence_number,
data,
}
}
}