From 3943faf9987c69d5113b68b3221a28fc69d6b9f2 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 14 Nov 2022 16:34:13 -0500 Subject: [PATCH] fix: Remove namespace from DmlWrite and DmlDelete constructors --- dml/src/lib.rs | 2 -- ingester/src/data.rs | 12 ------------ ingester/src/handler.rs | 3 --- ingester/src/stream_handler/handler.rs | 2 -- ingester/src/stream_handler/sink_instrumentation.rs | 1 - ingester/src/test_util.rs | 3 +-- ingester/tests/common/mod.rs | 1 - mutable_batch_tests/benches/write_pb.rs | 1 - query_tests/src/scenarios/util.rs | 2 -- router/src/dml_handlers/sharded_write_buffer.rs | 2 -- write_buffer/src/codec.rs | 3 --- write_buffer/src/core.rs | 4 +--- write_buffer/src/kafka/mod.rs | 6 ++---- write_buffer/src/kafka/record_aggregator.rs | 2 -- write_buffer/src/mock.rs | 2 -- 15 files changed, 4 insertions(+), 42 deletions(-) diff --git a/dml/src/lib.rs b/dml/src/lib.rs index f64bc5e869..29dfc20182 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -199,7 +199,6 @@ impl DmlWrite { /// - a MutableBatch is empty /// - a MutableBatch lacks an i64 "time" column pub fn new( - _namespace: impl Into, namespace_id: NamespaceId, tables: HashMap, table_ids: HashMap, @@ -337,7 +336,6 @@ pub struct DmlDelete { impl DmlDelete { /// Create a new [`DmlDelete`] pub fn new( - _namespace: impl Into, namespace_id: NamespaceId, predicate: DeletePredicate, table_name: Option, diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 9ee5d9ec16..30dd6e84be 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -771,7 +771,6 @@ mod tests { let batch = lines_to_batches("mem foo=1 10", 0).unwrap(); let w1 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(repos.deref_mut(), namespace.id, &batch).await, @@ -815,7 +814,6 @@ mod tests { let batch = lines_to_batches("mem foo=1 10", 0).unwrap(); let w2 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(&mut *catalog.repositories().await, namespace.id, &batch).await, @@ -876,7 +874,6 @@ mod tests { let batch = lines_to_batches("mem foo=1 10\nmem foo=1 11", 0).unwrap(); let w1 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(repos.deref_mut(), namespace.id, &batch).await, @@ -1004,7 +1001,6 @@ mod tests { let batch = lines_to_batches("mem foo=1 10", 0).unwrap(); let w1 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(repos.deref_mut(), namespace.id, &batch).await, @@ -1023,7 +1019,6 @@ mod tests { let batch = lines_to_batches("cpu foo=1 10", 1).unwrap(); let w2 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(repos.deref_mut(), namespace.id, &batch).await, @@ -1044,7 +1039,6 @@ mod tests { std::mem::drop(repos); let batch = lines_to_batches("mem foo=1 30", 2).unwrap(); let w3 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(&mut *catalog.repositories().await, namespace.id, &batch).await, @@ -1297,7 +1291,6 @@ mod tests { // write with sequence number 1 let batch = lines_to_batches("mem foo=1 10", 0).unwrap(); let w1 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(repos.deref_mut(), namespace.id, &batch).await, @@ -1317,7 +1310,6 @@ mod tests { // write with sequence number 2 let batch = lines_to_batches("mem foo=1 30\ncpu bar=1 20", 0).unwrap(); let w2 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(repos.deref_mut(), namespace.id, &batch).await, @@ -1418,7 +1410,6 @@ mod tests { let batch = lines_to_batches("mem foo=1 10", 0).unwrap(); let w1 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(repos.deref_mut(), namespace.id, &batch).await, @@ -1432,7 +1423,6 @@ mod tests { ); let batch = lines_to_batches("mem foo=1 10", 0).unwrap(); let w2 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(repos.deref_mut(), namespace.id, &batch).await, @@ -1618,7 +1608,6 @@ mod tests { let batch = lines_to_batches("mem foo=1 10", 0).unwrap(); let w1 = DmlWrite::new( - "foo", namespace.id, batch.clone(), build_id_map(repos.deref_mut(), namespace.id, &batch).await, @@ -1663,7 +1652,6 @@ mod tests { exprs: vec![], }; let d1 = DmlDelete::new( - "foo", NamespaceId::new(42), predicate, Some(NonEmptyString::new("mem").unwrap()), diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index ad4b5cee3b..a72dbb68ab 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -551,7 +551,6 @@ mod tests { let ingest_ts1 = Time::from_timestamp_millis(42).unwrap(); let write_operations = vec![DmlWrite::new( - "foo", NamespaceId::new(1), lines_to_batches("cpu bar=2 20", 0).unwrap(), [("cpu".to_string(), TableId::new(1))].into_iter().collect(), @@ -579,7 +578,6 @@ mod tests { let ingest_ts1 = Time::from_timestamp_millis(42).unwrap(); let write_operations = vec![DmlWrite::new( - "foo", NamespaceId::new(1), lines_to_batches("cpu bar=2 20", 0).unwrap(), [("cpu".to_string(), TableId::new(1))].into_iter().collect(), @@ -607,7 +605,6 @@ mod tests { let ingest_ts1 = Time::from_timestamp_millis(42).unwrap(); let write_operations = vec![DmlWrite::new( - "foo", NamespaceId::new(1), lines_to_batches("cpu bar=2 20", 0).unwrap(), [("cpu".to_string(), TableId::new(1))].into_iter().collect(), diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 5f268bdc55..3260af5fa8 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -553,7 +553,6 @@ mod tests { 42, ); DmlWrite::new( - "deprecated", NamespaceId::new(namespace_id), tables, ids, @@ -577,7 +576,6 @@ mod tests { 42, ); DmlDelete::new( - "deprecated", NamespaceId::new(namespace_id), pred, None, diff --git a/ingester/src/stream_handler/sink_instrumentation.rs b/ingester/src/stream_handler/sink_instrumentation.rs index 84259f533c..a4cc8f5ac6 100644 --- a/ingester/src/stream_handler/sink_instrumentation.rs +++ b/ingester/src/stream_handler/sink_instrumentation.rs @@ -278,7 +278,6 @@ mod tests { .map(|(i, v)| (v.clone(), TableId::new(i as _))) .collect(); DmlWrite::new( - "bananas", NamespaceId::new(42), tables, ids, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 77e2bd8394..49a976706e 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -588,7 +588,7 @@ pub(crate) fn make_partitions( pub(crate) fn make_write_op( partition_key: &PartitionKey, shard_index: ShardIndex, - namespace: &str, + _namespace: &str, namespace_id: NamespaceId, table_id: TableId, sequence_number: i64, @@ -600,7 +600,6 @@ pub(crate) fn make_write_op( let ids = [(TEST_TABLE.into(), table_id)].into_iter().collect(); DmlWrite::new( - namespace.to_string(), namespace_id, tables, ids, diff --git a/ingester/tests/common/mod.rs b/ingester/tests/common/mod.rs index 716ba3b544..113f1b22bc 100644 --- a/ingester/tests/common/mod.rs +++ b/ingester/tests/common/mod.rs @@ -294,7 +294,6 @@ impl TestContext { .await; self.enqueue_write(DmlWrite::new( - namespace, namespace_id, lines_to_batches(lp, 0).unwrap(), ids.clone(), diff --git a/mutable_batch_tests/benches/write_pb.rs b/mutable_batch_tests/benches/write_pb.rs index f462b3aea1..9d0a847f4a 100644 --- a/mutable_batch_tests/benches/write_pb.rs +++ b/mutable_batch_tests/benches/write_pb.rs @@ -20,7 +20,6 @@ fn generate_pbdata_bytes() -> Vec<(String, (usize, Bytes))> { .collect(); let write = DmlWrite::new( - "test_db", NamespaceId::new(42), batches, ids, diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 5a7f3a2c67..f28efa823d 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -835,7 +835,6 @@ impl MockIngester { 0, ); let op = DmlOperation::Write(DmlWrite::new( - self.ns.namespace.name.clone(), self.ns.namespace.id, mutable_batches, ids, @@ -862,7 +861,6 @@ impl MockIngester { 0, ); DmlOperation::Delete(DmlDelete::new( - self.ns.namespace.name.clone(), self.ns.namespace.id, predicate, Some(NonEmptyString::new(delete_table_name).unwrap()), diff --git a/router/src/dml_handlers/sharded_write_buffer.rs b/router/src/dml_handlers/sharded_write_buffer.rs index 9a8fc7c7e5..385528deb9 100644 --- a/router/src/dml_handlers/sharded_write_buffer.rs +++ b/router/src/dml_handlers/sharded_write_buffer.rs @@ -131,7 +131,6 @@ where let iter = collated.into_iter().map(|(shard, batch)| { let dml = DmlWrite::new( - namespace, namespace_id, batch, table_ids.remove(&shard).unwrap(), @@ -168,7 +167,6 @@ where let shards = self.sharder.shard(table_name, namespace, &predicate); let dml = DmlDelete::new( - namespace, namespace_id, predicate, NonEmptyString::new(table_name), diff --git a/write_buffer/src/codec.rs b/write_buffer/src/codec.rs index 620128f15d..5081e064f1 100644 --- a/write_buffer/src/codec.rs +++ b/write_buffer/src/codec.rs @@ -178,7 +178,6 @@ pub fn decode( }; Ok(DmlOperation::Write(DmlWrite::new( - "deprecated", NamespaceId::new(write.database_id), tables, ids.into_iter().map(|(k, v)| (k, TableId::new(v))).collect(), @@ -193,7 +192,6 @@ pub fn decode( .map_err(WriteBufferError::invalid_data)?; Ok(DmlOperation::Delete(DmlDelete::new( - "deprecated", NamespaceId::new(delete.database_id), predicate, NonEmptyString::new(delete.table_name), @@ -309,7 +307,6 @@ mod tests { let (data, ids) = lp_to_batches("platanos great=42 100\nbananas greatness=1000 100"); let w = DmlWrite::new( - "bananas", NamespaceId::new(42), data, ids, diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 7b6336813a..ceaf28d32b 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -392,7 +392,7 @@ pub mod test_utils { /// Writes line protocol and returns the [`DmlWrite`] that was written. pub async fn write( - namespace: &str, + _namespace: &str, writer: &impl WriteBufferWriting, lp: &str, shard_index: ShardIndex, @@ -402,7 +402,6 @@ pub mod test_utils { let (tables, names) = lp_to_batches(lp); let write = DmlWrite::new( - namespace, NamespaceId::new(42), tables, names, @@ -1251,7 +1250,6 @@ pub mod test_utils { let (tables, names) = lp_to_batches("upc user=1 100"); let write = DmlWrite::new( - "foo", NamespaceId::new(42), tables, names, diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index 81eb8e3231..2036d5dd31 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -829,7 +829,7 @@ mod tests { } async fn write( - namespace: &str, + _namespace: &str, producer: &RSKafkaProducer, trace_collector: &Arc, shard_index: ShardIndex, @@ -838,7 +838,6 @@ mod tests { let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>); let (tables, names) = lp_to_batches("table foo=1"); let write = DmlWrite::new( - namespace, NamespaceId::new(42), tables, names, @@ -850,14 +849,13 @@ mod tests { } async fn delete( - namespace: &str, + _namespace: &str, producer: &RSKafkaProducer, trace_collector: &Arc, shard_index: ShardIndex, ) -> DmlMeta { let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>); let op = DmlOperation::Delete(DmlDelete::new( - namespace, NamespaceId::new(42), DeletePredicate { range: TimestampRange::new(0, 1), diff --git a/write_buffer/src/kafka/record_aggregator.rs b/write_buffer/src/kafka/record_aggregator.rs index fddcebb23a..3bf75a4bd9 100644 --- a/write_buffer/src/kafka/record_aggregator.rs +++ b/write_buffer/src/kafka/record_aggregator.rs @@ -207,7 +207,6 @@ mod tests { use super::*; - const NAMESPACE: &str = "bananas"; const SHARD_INDEX: ShardIndex = ShardIndex::new(42); const TIMESTAMP_MILLIS: i64 = 1659990497000; @@ -233,7 +232,6 @@ mod tests { .collect(); DmlOperation::Write(DmlWrite::new( - NAMESPACE.to_string(), NamespaceId::new(42), m, ids, diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index 629e27fafd..77aa9ec250 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -178,7 +178,6 @@ impl MockBufferSharedState { let (tables, names) = lp_to_batches(lp); let meta = DmlMeta::sequenced(sequence, iox_time::Time::from_timestamp_nanos(0), None, 0); self.push_write(DmlWrite::new( - "foo", NamespaceId::new(42), tables, names, @@ -905,7 +904,6 @@ mod tests { let (tables, names) = lp_to_batches("upc user=1 100"); let operation = DmlOperation::Write(DmlWrite::new( - "test_db", NamespaceId::new(42), tables, names,