fix: Remove namespace from DmlWrite and DmlDelete constructors
parent
f78195f7c7
commit
3943faf998
|
@ -199,7 +199,6 @@ impl DmlWrite {
|
|||
/// - a MutableBatch is empty
|
||||
/// - a MutableBatch lacks an i64 "time" column
|
||||
pub fn new(
|
||||
_namespace: impl Into<String>,
|
||||
namespace_id: NamespaceId,
|
||||
tables: HashMap<String, MutableBatch>,
|
||||
table_ids: HashMap<String, TableId>,
|
||||
|
@ -337,7 +336,6 @@ pub struct DmlDelete {
|
|||
impl DmlDelete {
|
||||
/// Create a new [`DmlDelete`]
|
||||
pub fn new(
|
||||
_namespace: impl Into<String>,
|
||||
namespace_id: NamespaceId,
|
||||
predicate: DeletePredicate,
|
||||
table_name: Option<NonEmptyString>,
|
||||
|
|
|
@ -771,7 +771,6 @@ mod tests {
|
|||
|
||||
let batch = lines_to_batches("mem foo=1 10", 0).unwrap();
|
||||
let w1 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(repos.deref_mut(), namespace.id, &batch).await,
|
||||
|
@ -815,7 +814,6 @@ mod tests {
|
|||
|
||||
let batch = lines_to_batches("mem foo=1 10", 0).unwrap();
|
||||
let w2 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(&mut *catalog.repositories().await, namespace.id, &batch).await,
|
||||
|
@ -876,7 +874,6 @@ mod tests {
|
|||
|
||||
let batch = lines_to_batches("mem foo=1 10\nmem foo=1 11", 0).unwrap();
|
||||
let w1 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(repos.deref_mut(), namespace.id, &batch).await,
|
||||
|
@ -1004,7 +1001,6 @@ mod tests {
|
|||
|
||||
let batch = lines_to_batches("mem foo=1 10", 0).unwrap();
|
||||
let w1 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(repos.deref_mut(), namespace.id, &batch).await,
|
||||
|
@ -1023,7 +1019,6 @@ mod tests {
|
|||
|
||||
let batch = lines_to_batches("cpu foo=1 10", 1).unwrap();
|
||||
let w2 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(repos.deref_mut(), namespace.id, &batch).await,
|
||||
|
@ -1044,7 +1039,6 @@ mod tests {
|
|||
std::mem::drop(repos);
|
||||
let batch = lines_to_batches("mem foo=1 30", 2).unwrap();
|
||||
let w3 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(&mut *catalog.repositories().await, namespace.id, &batch).await,
|
||||
|
@ -1297,7 +1291,6 @@ mod tests {
|
|||
// write with sequence number 1
|
||||
let batch = lines_to_batches("mem foo=1 10", 0).unwrap();
|
||||
let w1 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(repos.deref_mut(), namespace.id, &batch).await,
|
||||
|
@ -1317,7 +1310,6 @@ mod tests {
|
|||
// write with sequence number 2
|
||||
let batch = lines_to_batches("mem foo=1 30\ncpu bar=1 20", 0).unwrap();
|
||||
let w2 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(repos.deref_mut(), namespace.id, &batch).await,
|
||||
|
@ -1418,7 +1410,6 @@ mod tests {
|
|||
|
||||
let batch = lines_to_batches("mem foo=1 10", 0).unwrap();
|
||||
let w1 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(repos.deref_mut(), namespace.id, &batch).await,
|
||||
|
@ -1432,7 +1423,6 @@ mod tests {
|
|||
);
|
||||
let batch = lines_to_batches("mem foo=1 10", 0).unwrap();
|
||||
let w2 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(repos.deref_mut(), namespace.id, &batch).await,
|
||||
|
@ -1618,7 +1608,6 @@ mod tests {
|
|||
|
||||
let batch = lines_to_batches("mem foo=1 10", 0).unwrap();
|
||||
let w1 = DmlWrite::new(
|
||||
"foo",
|
||||
namespace.id,
|
||||
batch.clone(),
|
||||
build_id_map(repos.deref_mut(), namespace.id, &batch).await,
|
||||
|
@ -1663,7 +1652,6 @@ mod tests {
|
|||
exprs: vec![],
|
||||
};
|
||||
let d1 = DmlDelete::new(
|
||||
"foo",
|
||||
NamespaceId::new(42),
|
||||
predicate,
|
||||
Some(NonEmptyString::new("mem").unwrap()),
|
||||
|
|
|
@ -551,7 +551,6 @@ mod tests {
|
|||
|
||||
let ingest_ts1 = Time::from_timestamp_millis(42).unwrap();
|
||||
let write_operations = vec![DmlWrite::new(
|
||||
"foo",
|
||||
NamespaceId::new(1),
|
||||
lines_to_batches("cpu bar=2 20", 0).unwrap(),
|
||||
[("cpu".to_string(), TableId::new(1))].into_iter().collect(),
|
||||
|
@ -579,7 +578,6 @@ mod tests {
|
|||
|
||||
let ingest_ts1 = Time::from_timestamp_millis(42).unwrap();
|
||||
let write_operations = vec![DmlWrite::new(
|
||||
"foo",
|
||||
NamespaceId::new(1),
|
||||
lines_to_batches("cpu bar=2 20", 0).unwrap(),
|
||||
[("cpu".to_string(), TableId::new(1))].into_iter().collect(),
|
||||
|
@ -607,7 +605,6 @@ mod tests {
|
|||
|
||||
let ingest_ts1 = Time::from_timestamp_millis(42).unwrap();
|
||||
let write_operations = vec![DmlWrite::new(
|
||||
"foo",
|
||||
NamespaceId::new(1),
|
||||
lines_to_batches("cpu bar=2 20", 0).unwrap(),
|
||||
[("cpu".to_string(), TableId::new(1))].into_iter().collect(),
|
||||
|
|
|
@ -553,7 +553,6 @@ mod tests {
|
|||
42,
|
||||
);
|
||||
DmlWrite::new(
|
||||
"deprecated",
|
||||
NamespaceId::new(namespace_id),
|
||||
tables,
|
||||
ids,
|
||||
|
@ -577,7 +576,6 @@ mod tests {
|
|||
42,
|
||||
);
|
||||
DmlDelete::new(
|
||||
"deprecated",
|
||||
NamespaceId::new(namespace_id),
|
||||
pred,
|
||||
None,
|
||||
|
|
|
@ -278,7 +278,6 @@ mod tests {
|
|||
.map(|(i, v)| (v.clone(), TableId::new(i as _)))
|
||||
.collect();
|
||||
DmlWrite::new(
|
||||
"bananas",
|
||||
NamespaceId::new(42),
|
||||
tables,
|
||||
ids,
|
||||
|
|
|
@ -588,7 +588,7 @@ pub(crate) fn make_partitions(
|
|||
pub(crate) fn make_write_op(
|
||||
partition_key: &PartitionKey,
|
||||
shard_index: ShardIndex,
|
||||
namespace: &str,
|
||||
_namespace: &str,
|
||||
namespace_id: NamespaceId,
|
||||
table_id: TableId,
|
||||
sequence_number: i64,
|
||||
|
@ -600,7 +600,6 @@ pub(crate) fn make_write_op(
|
|||
|
||||
let ids = [(TEST_TABLE.into(), table_id)].into_iter().collect();
|
||||
DmlWrite::new(
|
||||
namespace.to_string(),
|
||||
namespace_id,
|
||||
tables,
|
||||
ids,
|
||||
|
|
|
@ -294,7 +294,6 @@ impl TestContext {
|
|||
.await;
|
||||
|
||||
self.enqueue_write(DmlWrite::new(
|
||||
namespace,
|
||||
namespace_id,
|
||||
lines_to_batches(lp, 0).unwrap(),
|
||||
ids.clone(),
|
||||
|
|
|
@ -20,7 +20,6 @@ fn generate_pbdata_bytes() -> Vec<(String, (usize, Bytes))> {
|
|||
.collect();
|
||||
|
||||
let write = DmlWrite::new(
|
||||
"test_db",
|
||||
NamespaceId::new(42),
|
||||
batches,
|
||||
ids,
|
||||
|
|
|
@ -835,7 +835,6 @@ impl MockIngester {
|
|||
0,
|
||||
);
|
||||
let op = DmlOperation::Write(DmlWrite::new(
|
||||
self.ns.namespace.name.clone(),
|
||||
self.ns.namespace.id,
|
||||
mutable_batches,
|
||||
ids,
|
||||
|
@ -862,7 +861,6 @@ impl MockIngester {
|
|||
0,
|
||||
);
|
||||
DmlOperation::Delete(DmlDelete::new(
|
||||
self.ns.namespace.name.clone(),
|
||||
self.ns.namespace.id,
|
||||
predicate,
|
||||
Some(NonEmptyString::new(delete_table_name).unwrap()),
|
||||
|
|
|
@ -131,7 +131,6 @@ where
|
|||
|
||||
let iter = collated.into_iter().map(|(shard, batch)| {
|
||||
let dml = DmlWrite::new(
|
||||
namespace,
|
||||
namespace_id,
|
||||
batch,
|
||||
table_ids.remove(&shard).unwrap(),
|
||||
|
@ -168,7 +167,6 @@ where
|
|||
let shards = self.sharder.shard(table_name, namespace, &predicate);
|
||||
|
||||
let dml = DmlDelete::new(
|
||||
namespace,
|
||||
namespace_id,
|
||||
predicate,
|
||||
NonEmptyString::new(table_name),
|
||||
|
|
|
@ -178,7 +178,6 @@ pub fn decode(
|
|||
};
|
||||
|
||||
Ok(DmlOperation::Write(DmlWrite::new(
|
||||
"deprecated",
|
||||
NamespaceId::new(write.database_id),
|
||||
tables,
|
||||
ids.into_iter().map(|(k, v)| (k, TableId::new(v))).collect(),
|
||||
|
@ -193,7 +192,6 @@ pub fn decode(
|
|||
.map_err(WriteBufferError::invalid_data)?;
|
||||
|
||||
Ok(DmlOperation::Delete(DmlDelete::new(
|
||||
"deprecated",
|
||||
NamespaceId::new(delete.database_id),
|
||||
predicate,
|
||||
NonEmptyString::new(delete.table_name),
|
||||
|
@ -309,7 +307,6 @@ mod tests {
|
|||
let (data, ids) = lp_to_batches("platanos great=42 100\nbananas greatness=1000 100");
|
||||
|
||||
let w = DmlWrite::new(
|
||||
"bananas",
|
||||
NamespaceId::new(42),
|
||||
data,
|
||||
ids,
|
||||
|
|
|
@ -392,7 +392,7 @@ pub mod test_utils {
|
|||
|
||||
/// Writes line protocol and returns the [`DmlWrite`] that was written.
|
||||
pub async fn write(
|
||||
namespace: &str,
|
||||
_namespace: &str,
|
||||
writer: &impl WriteBufferWriting,
|
||||
lp: &str,
|
||||
shard_index: ShardIndex,
|
||||
|
@ -402,7 +402,6 @@ pub mod test_utils {
|
|||
let (tables, names) = lp_to_batches(lp);
|
||||
|
||||
let write = DmlWrite::new(
|
||||
namespace,
|
||||
NamespaceId::new(42),
|
||||
tables,
|
||||
names,
|
||||
|
@ -1251,7 +1250,6 @@ pub mod test_utils {
|
|||
|
||||
let (tables, names) = lp_to_batches("upc user=1 100");
|
||||
let write = DmlWrite::new(
|
||||
"foo",
|
||||
NamespaceId::new(42),
|
||||
tables,
|
||||
names,
|
||||
|
|
|
@ -829,7 +829,7 @@ mod tests {
|
|||
}
|
||||
|
||||
async fn write(
|
||||
namespace: &str,
|
||||
_namespace: &str,
|
||||
producer: &RSKafkaProducer,
|
||||
trace_collector: &Arc<RingBufferTraceCollector>,
|
||||
shard_index: ShardIndex,
|
||||
|
@ -838,7 +838,6 @@ mod tests {
|
|||
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
|
||||
let (tables, names) = lp_to_batches("table foo=1");
|
||||
let write = DmlWrite::new(
|
||||
namespace,
|
||||
NamespaceId::new(42),
|
||||
tables,
|
||||
names,
|
||||
|
@ -850,14 +849,13 @@ mod tests {
|
|||
}
|
||||
|
||||
async fn delete(
|
||||
namespace: &str,
|
||||
_namespace: &str,
|
||||
producer: &RSKafkaProducer,
|
||||
trace_collector: &Arc<RingBufferTraceCollector>,
|
||||
shard_index: ShardIndex,
|
||||
) -> DmlMeta {
|
||||
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
|
||||
let op = DmlOperation::Delete(DmlDelete::new(
|
||||
namespace,
|
||||
NamespaceId::new(42),
|
||||
DeletePredicate {
|
||||
range: TimestampRange::new(0, 1),
|
||||
|
|
|
@ -207,7 +207,6 @@ mod tests {
|
|||
|
||||
use super::*;
|
||||
|
||||
const NAMESPACE: &str = "bananas";
|
||||
const SHARD_INDEX: ShardIndex = ShardIndex::new(42);
|
||||
const TIMESTAMP_MILLIS: i64 = 1659990497000;
|
||||
|
||||
|
@ -233,7 +232,6 @@ mod tests {
|
|||
.collect();
|
||||
|
||||
DmlOperation::Write(DmlWrite::new(
|
||||
NAMESPACE.to_string(),
|
||||
NamespaceId::new(42),
|
||||
m,
|
||||
ids,
|
||||
|
|
|
@ -178,7 +178,6 @@ impl MockBufferSharedState {
|
|||
let (tables, names) = lp_to_batches(lp);
|
||||
let meta = DmlMeta::sequenced(sequence, iox_time::Time::from_timestamp_nanos(0), None, 0);
|
||||
self.push_write(DmlWrite::new(
|
||||
"foo",
|
||||
NamespaceId::new(42),
|
||||
tables,
|
||||
names,
|
||||
|
@ -905,7 +904,6 @@ mod tests {
|
|||
|
||||
let (tables, names) = lp_to_batches("upc user=1 100");
|
||||
let operation = DmlOperation::Write(DmlWrite::new(
|
||||
"test_db",
|
||||
NamespaceId::new(42),
|
||||
tables,
|
||||
names,
|
||||
|
|
Loading…
Reference in New Issue