feat: NamespaceId in DmlDelete

Changes the DmlDelete to contain the NamespaceId for which it should be
applied, propagating this value over the wire.

Like the existing IDs within the DmlWrite, these values are marked
unsafe to use due to avoid the consumers utilising them accidentally
during deployment. Unlike DmlWrite, the DmlDelete is completely unused,
so this is less of an issue.
pull/24376/head
Dom Dwyer 2022-11-03 12:00:48 +01:00
parent a7835009d8
commit 6fa48731aa
9 changed files with 46 additions and 1 deletions

View File

@ -158,6 +158,20 @@ impl DmlOperation {
Self::Delete(d) => d.namespace(),
}
}
/// Namespace catalog ID associated with this operation
///
/// # Safety
///
/// Marked unsafe because of the critical invariant; Kafka conumers MUST NOT
/// utilise this method until this warning is removed. See [`DmlWrite`]
/// docs.
pub unsafe fn namespace_id(&self) -> NamespaceId {
match self {
Self::Write(w) => w.namespace_id(),
Self::Delete(d) => d.namespace_id(),
}
}
}
impl From<DmlWrite> for DmlOperation {
@ -360,6 +374,7 @@ impl DmlWrite {
#[derive(Debug, Clone, PartialEq)]
pub struct DmlDelete {
namespace: String,
namespace_id: NamespaceId,
predicate: DeletePredicate,
table_name: Option<NonEmptyString>,
meta: DmlMeta,
@ -369,12 +384,14 @@ impl DmlDelete {
/// Create a new [`DmlDelete`]
pub fn new(
namespace: impl Into<String>,
namespace_id: NamespaceId,
predicate: DeletePredicate,
table_name: Option<NonEmptyString>,
meta: DmlMeta,
) -> Self {
Self {
namespace: namespace.into(),
namespace_id,
predicate,
table_name,
meta,
@ -419,6 +436,17 @@ impl DmlDelete {
+ self.meta.size()
- std::mem::size_of::<DmlMeta>()
}
/// Return the [`NamespaceId`] to which this operation should be applied.
///
/// # Safety
///
/// Marked unsafe because of the critical invariant; Kafka conumers MUST NOT
/// utilise this method until this warning is removed. See [`DmlWrite`]
/// docs.
pub unsafe fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
}
/// Test utilities

View File

@ -38,6 +38,9 @@ message DeletePayload {
// The name of the database
string db_name = 1;
// The catalog ID for this database / namespace.
int64 database_id = 4;
// An optional table name to restrict this delete to
string table_name = 2;

View File

@ -77,6 +77,7 @@ impl Client {
pub async fn delete(
&mut self,
db_name: impl Into<String> + Send,
database_id: i64,
table_name: impl Into<String> + Send,
predicate: Predicate,
) -> Result<(), Error> {
@ -87,6 +88,7 @@ impl Client {
.delete(DeleteRequest {
payload: Some(DeletePayload {
db_name,
database_id,
table_name,
predicate: Some(predicate),
}),

View File

@ -1523,6 +1523,7 @@ mod tests {
};
let d1 = DmlDelete::new(
"foo",
NamespaceId::new(42),
predicate,
Some(NonEmptyString::new("mem").unwrap()),
DmlMeta::sequenced(

View File

@ -576,7 +576,7 @@ mod tests {
None,
42,
);
DmlDelete::new(name, pred, None, sequence)
DmlDelete::new(name, NamespaceId::new(42), pred, None, sequence)
}
#[derive(Debug)]

View File

@ -862,6 +862,7 @@ impl MockIngester {
);
DmlOperation::Delete(DmlDelete::new(
self.ns.namespace.name.clone(),
self.ns.namespace.id,
predicate,
Some(NonEmptyString::new(delete_table_name).unwrap()),
meta,

View File

@ -169,6 +169,7 @@ where
let dml = DmlDelete::new(
namespace,
namespace_id,
predicate,
NonEmptyString::new(table_name),
DmlMeta::unsequenced(span_ctx),

View File

@ -225,6 +225,7 @@ pub fn decode(
Ok(DmlOperation::Delete(DmlDelete::new(
headers.namespace,
NamespaceId::new(delete.database_id),
predicate,
NonEmptyString::new(delete.table_name),
meta,
@ -254,6 +255,13 @@ pub fn encode_operation(
}
DmlOperation::Delete(delete) => Payload::Delete(DeletePayload {
db_name: db_name.to_string(),
database_id: unsafe {
// Safety: this code path is only invoked in the Kafka producer, so
// it is safe to utilise the ID.
//
// See DmlWrite docs for context.
delete.namespace_id().get()
},
table_name: delete
.table_name()
.map(ToString::to_string)

View File

@ -857,6 +857,7 @@ mod tests {
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
let op = DmlOperation::Delete(DmlDelete::new(
namespace,
NamespaceId::new(42),
DeletePredicate {
range: TimestampRange::new(0, 1),
exprs: vec![],