diff --git a/dml/src/lib.rs b/dml/src/lib.rs index 7c13d3fd08..a37959af81 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -158,6 +158,20 @@ impl DmlOperation { Self::Delete(d) => d.namespace(), } } + + /// Namespace catalog ID associated with this operation + /// + /// # Safety + /// + /// Marked unsafe because of the critical invariant; Kafka conumers MUST NOT + /// utilise this method until this warning is removed. See [`DmlWrite`] + /// docs. + pub unsafe fn namespace_id(&self) -> NamespaceId { + match self { + Self::Write(w) => w.namespace_id(), + Self::Delete(d) => d.namespace_id(), + } + } } impl From for DmlOperation { @@ -360,6 +374,7 @@ impl DmlWrite { #[derive(Debug, Clone, PartialEq)] pub struct DmlDelete { namespace: String, + namespace_id: NamespaceId, predicate: DeletePredicate, table_name: Option, meta: DmlMeta, @@ -369,12 +384,14 @@ impl DmlDelete { /// Create a new [`DmlDelete`] pub fn new( namespace: impl Into, + namespace_id: NamespaceId, predicate: DeletePredicate, table_name: Option, meta: DmlMeta, ) -> Self { Self { namespace: namespace.into(), + namespace_id, predicate, table_name, meta, @@ -419,6 +436,17 @@ impl DmlDelete { + self.meta.size() - std::mem::size_of::() } + + /// Return the [`NamespaceId`] to which this operation should be applied. + /// + /// # Safety + /// + /// Marked unsafe because of the critical invariant; Kafka conumers MUST NOT + /// utilise this method until this warning is removed. See [`DmlWrite`] + /// docs. + pub unsafe fn namespace_id(&self) -> NamespaceId { + self.namespace_id + } } /// Test utilities diff --git a/generated_types/protos/influxdata/iox/delete/v1/service.proto b/generated_types/protos/influxdata/iox/delete/v1/service.proto index 42b98777d5..6a639d387c 100644 --- a/generated_types/protos/influxdata/iox/delete/v1/service.proto +++ b/generated_types/protos/influxdata/iox/delete/v1/service.proto @@ -38,6 +38,9 @@ message DeletePayload { // The name of the database string db_name = 1; + // The catalog ID for this database / namespace. + int64 database_id = 4; + // An optional table name to restrict this delete to string table_name = 2; diff --git a/influxdb_iox_client/src/client/delete.rs b/influxdb_iox_client/src/client/delete.rs index 4b4e64d26a..7451e79f7b 100644 --- a/influxdb_iox_client/src/client/delete.rs +++ b/influxdb_iox_client/src/client/delete.rs @@ -77,6 +77,7 @@ impl Client { pub async fn delete( &mut self, db_name: impl Into + Send, + database_id: i64, table_name: impl Into + Send, predicate: Predicate, ) -> Result<(), Error> { @@ -87,6 +88,7 @@ impl Client { .delete(DeleteRequest { payload: Some(DeletePayload { db_name, + database_id, table_name, predicate: Some(predicate), }), diff --git a/ingester/src/data.rs b/ingester/src/data.rs index db8a7332a4..20033c445a 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1523,6 +1523,7 @@ mod tests { }; let d1 = DmlDelete::new( "foo", + NamespaceId::new(42), predicate, Some(NonEmptyString::new("mem").unwrap()), DmlMeta::sequenced( diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 5654fc0190..3cc2c3854d 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -576,7 +576,7 @@ mod tests { None, 42, ); - DmlDelete::new(name, pred, None, sequence) + DmlDelete::new(name, NamespaceId::new(42), pred, None, sequence) } #[derive(Debug)] diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 63238c9f25..c88dc1c2e9 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -862,6 +862,7 @@ impl MockIngester { ); DmlOperation::Delete(DmlDelete::new( self.ns.namespace.name.clone(), + self.ns.namespace.id, predicate, Some(NonEmptyString::new(delete_table_name).unwrap()), meta, diff --git a/router/src/dml_handlers/sharded_write_buffer.rs b/router/src/dml_handlers/sharded_write_buffer.rs index fda8c721d3..5bd212b4ea 100644 --- a/router/src/dml_handlers/sharded_write_buffer.rs +++ b/router/src/dml_handlers/sharded_write_buffer.rs @@ -169,6 +169,7 @@ where let dml = DmlDelete::new( namespace, + namespace_id, predicate, NonEmptyString::new(table_name), DmlMeta::unsequenced(span_ctx), diff --git a/write_buffer/src/codec.rs b/write_buffer/src/codec.rs index 60a1aa9af3..60823ffc71 100644 --- a/write_buffer/src/codec.rs +++ b/write_buffer/src/codec.rs @@ -225,6 +225,7 @@ pub fn decode( Ok(DmlOperation::Delete(DmlDelete::new( headers.namespace, + NamespaceId::new(delete.database_id), predicate, NonEmptyString::new(delete.table_name), meta, @@ -254,6 +255,13 @@ pub fn encode_operation( } DmlOperation::Delete(delete) => Payload::Delete(DeletePayload { db_name: db_name.to_string(), + database_id: unsafe { + // Safety: this code path is only invoked in the Kafka producer, so + // it is safe to utilise the ID. + // + // See DmlWrite docs for context. + delete.namespace_id().get() + }, table_name: delete .table_name() .map(ToString::to_string) diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index c074fd57d7..b8776bec39 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -857,6 +857,7 @@ mod tests { let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>); let op = DmlOperation::Delete(DmlDelete::new( namespace, + NamespaceId::new(42), DeletePredicate { range: TimestampRange::new(0, 1), exprs: vec![],