Merge pull request #7979 from influxdata/savage/replace-dml-operation-for-ingester-rpc-write

refactor(ingester): Add `data_types` module with `IngestOp` enumeration
pull/24376/head
kodiakhq[bot] 2023-06-13 13:35:00 +00:00 committed by GitHub
commit af92463848
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 0 deletions

View File

@ -0,0 +1,63 @@
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use trace::ctx::SpanContext;
/// The set of operations which the ingester can derive and process from wire
/// requests
pub enum IngestOp {
Write(WriteOperation),
}
/// A decoded representation of the data contained by an RPC write
/// represented by an [`IngestOp::Write`]
pub struct WriteOperation {
namespace: NamespaceId,
tables: HashMap<TableId, TableData>,
partition_key: PartitionKey,
span_context: Option<SpanContext>,
}
impl WriteOperation {
/// Construct a new [`WriteOperation`] from the provided details.
///
/// # Panic
///
/// Panics if
///
/// - `tables` is empty
pub fn new(
namespace: NamespaceId,
tables: HashMap<TableId, TableData>,
partition_key: PartitionKey,
span_context: Option<SpanContext>,
) -> Self {
assert_ne!(tables.len(), 0);
Self {
namespace,
tables,
partition_key,
span_context,
}
}
}
/// A container for all data for an individual table as part of a write
/// operation
pub struct TableData {
table: TableId,
/// The partitioned data for `table` in the write. Currently data is
/// partitioned in a way that each table has a single partition of
// data associated with it per write
partitioned_data: PartitionedData,
}
/// Partitioned data belonging to a write, sequenced individually from
/// other [`PartitionedData`]
pub struct PartitionedData {
sequence_number: SequenceNumber,
data: MutableBatch,
}

View File

@ -0,0 +1,6 @@
//! A module containing crate-local data types that are shared across the
//! ingester's internals
mod ingest_op;
#[allow(unused_imports)]
pub(crate) use ingest_op::*;

View File

@ -241,6 +241,7 @@ mod arcmap;
mod buffer_tree;
mod cancellation_safe;
mod deferred_load;
mod dml_payload;
mod dml_sink;
mod ingest_state;
mod ingester_id;