fix: Remove namespace name field from DmlWrite and DmlDelete
But leave the argument in their constructors for now. Not all numbers in tests can be 42, Dom.pull/24376/head
parent
c203e8295f
commit
f78195f7c7
|
@ -151,14 +151,6 @@ impl DmlOperation {
|
|||
}
|
||||
}
|
||||
|
||||
/// Namespace associated with this operation
|
||||
pub fn namespace(&self) -> &str {
|
||||
match self {
|
||||
Self::Write(w) => w.namespace(),
|
||||
Self::Delete(d) => d.namespace(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Namespace catalog ID associated with this operation
|
||||
pub fn namespace_id(&self) -> NamespaceId {
|
||||
match self {
|
||||
|
@ -184,7 +176,6 @@ impl From<DmlDelete> for DmlOperation {
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct DmlWrite {
|
||||
/// The namespace being written to
|
||||
namespace: String,
|
||||
namespace_id: NamespaceId,
|
||||
/// Writes to individual tables keyed by table name
|
||||
tables: HashMap<String, MutableBatch>,
|
||||
|
@ -208,7 +199,7 @@ impl DmlWrite {
|
|||
/// - a MutableBatch is empty
|
||||
/// - a MutableBatch lacks an i64 "time" column
|
||||
pub fn new(
|
||||
namespace: impl Into<String>,
|
||||
_namespace: impl Into<String>,
|
||||
namespace_id: NamespaceId,
|
||||
tables: HashMap<String, MutableBatch>,
|
||||
table_ids: HashMap<String, TableId>,
|
||||
|
@ -236,7 +227,6 @@ impl DmlWrite {
|
|||
}
|
||||
|
||||
Self {
|
||||
namespace: namespace.into(),
|
||||
tables,
|
||||
table_ids,
|
||||
partition_key,
|
||||
|
@ -247,11 +237,6 @@ impl DmlWrite {
|
|||
}
|
||||
}
|
||||
|
||||
/// Namespace associated with this write
|
||||
pub fn namespace(&self) -> &str {
|
||||
&self.namespace
|
||||
}
|
||||
|
||||
/// Metadata associated with this write
|
||||
pub fn meta(&self) -> &DmlMeta {
|
||||
&self.meta
|
||||
|
@ -343,7 +328,6 @@ impl DmlWrite {
|
|||
/// A delete operation
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct DmlDelete {
|
||||
namespace: String,
|
||||
namespace_id: NamespaceId,
|
||||
predicate: DeletePredicate,
|
||||
table_name: Option<NonEmptyString>,
|
||||
|
@ -353,14 +337,13 @@ pub struct DmlDelete {
|
|||
impl DmlDelete {
|
||||
/// Create a new [`DmlDelete`]
|
||||
pub fn new(
|
||||
namespace: impl Into<String>,
|
||||
_namespace: impl Into<String>,
|
||||
namespace_id: NamespaceId,
|
||||
predicate: DeletePredicate,
|
||||
table_name: Option<NonEmptyString>,
|
||||
meta: DmlMeta,
|
||||
) -> Self {
|
||||
Self {
|
||||
namespace: namespace.into(),
|
||||
namespace_id,
|
||||
predicate,
|
||||
table_name,
|
||||
|
@ -368,11 +351,6 @@ impl DmlDelete {
|
|||
}
|
||||
}
|
||||
|
||||
/// Namespace associated with this delete
|
||||
pub fn namespace(&self) -> &str {
|
||||
&self.namespace
|
||||
}
|
||||
|
||||
/// Returns the table_name for this delete
|
||||
pub fn table_name(&self) -> Option<&str> {
|
||||
self.table_name.as_deref()
|
||||
|
|
|
@ -377,7 +377,7 @@ something clever.",
|
|||
shard_index=%self.shard_index,
|
||||
shard_id=%self.shard_id,
|
||||
op_size=op.size(),
|
||||
op_namespace=op.namespace(),
|
||||
op_namespace_id=op.namespace_id().get(),
|
||||
?op_sequence_number,
|
||||
"decoded dml operation"
|
||||
);
|
||||
|
@ -536,8 +536,8 @@ mod tests {
|
|||
static TEST_SHARD_INDEX: ShardIndex = ShardIndex::new(42);
|
||||
static TEST_TOPIC_NAME: &str = "topic_name";
|
||||
|
||||
// Return a DmlWrite with the given namespace and a single table.
|
||||
fn make_write(name: impl Into<String>, write_time: u64) -> DmlWrite {
|
||||
// Return a DmlWrite with the given namespace ID and a single table.
|
||||
fn make_write(namespace_id: i64, write_time: u64) -> DmlWrite {
|
||||
let tables = lines_to_batches("bananas level=42 4242", 0).unwrap();
|
||||
let ids = tables
|
||||
.keys()
|
||||
|
@ -553,8 +553,8 @@ mod tests {
|
|||
42,
|
||||
);
|
||||
DmlWrite::new(
|
||||
name,
|
||||
NamespaceId::new(42),
|
||||
"deprecated",
|
||||
NamespaceId::new(namespace_id),
|
||||
tables,
|
||||
ids,
|
||||
"1970-01-01".into(),
|
||||
|
@ -562,8 +562,8 @@ mod tests {
|
|||
)
|
||||
}
|
||||
|
||||
// Return a DmlDelete with the given namespace.
|
||||
fn make_delete(name: impl Into<String>, write_time: u64) -> DmlDelete {
|
||||
// Return a DmlDelete with the given namespace ID.
|
||||
fn make_delete(namespace_id: i64, write_time: u64) -> DmlDelete {
|
||||
let pred = DeletePredicate {
|
||||
range: TimestampRange::new(1, 2),
|
||||
exprs: vec![],
|
||||
|
@ -576,7 +576,13 @@ mod tests {
|
|||
None,
|
||||
42,
|
||||
);
|
||||
DmlDelete::new(name, NamespaceId::new(42), pred, None, sequence)
|
||||
DmlDelete::new(
|
||||
"deprecated",
|
||||
NamespaceId::new(namespace_id),
|
||||
pred,
|
||||
None,
|
||||
sequence,
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -798,14 +804,14 @@ mod tests {
|
|||
write_ok,
|
||||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![
|
||||
vec![Ok(DmlOperation::Write(make_write("bananas", 42)))]
|
||||
vec![Ok(DmlOperation::Write(make_write(1111, 42)))]
|
||||
],
|
||||
sink_rets = [Ok(DmlApplyAction::Applied(true))],
|
||||
want_ttbr = 42,
|
||||
want_reset = 0,
|
||||
want_err_metrics = [],
|
||||
want_sink = [DmlOperation::Write(op)] => {
|
||||
assert_eq!(op.namespace(), "bananas");
|
||||
assert_eq!(op.namespace_id().get(), 1111);
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -814,14 +820,14 @@ mod tests {
|
|||
delete_ok,
|
||||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![
|
||||
vec![Ok(DmlOperation::Delete(make_delete("platanos", 24)))]
|
||||
vec![Ok(DmlOperation::Delete(make_delete(1111, 24)))]
|
||||
],
|
||||
sink_rets = [Ok(DmlApplyAction::Applied(true))],
|
||||
want_ttbr = 24,
|
||||
want_reset = 0,
|
||||
want_err_metrics = [],
|
||||
want_sink = [DmlOperation::Delete(op)] => {
|
||||
assert_eq!(op.namespace(), "platanos");
|
||||
assert_eq!(op.namespace_id().get(), 1111);
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -832,7 +838,7 @@ mod tests {
|
|||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![vec![
|
||||
Err(WriteBufferError::new(WriteBufferErrorKind::IO, "explosions")),
|
||||
Ok(DmlOperation::Write(make_write("bananas", 13)))
|
||||
Ok(DmlOperation::Write(make_write(1111, 13)))
|
||||
]],
|
||||
sink_rets = [Ok(DmlApplyAction::Applied(true))],
|
||||
want_ttbr = 13,
|
||||
|
@ -846,7 +852,7 @@ mod tests {
|
|||
"skipped_sequence_number_amount" => 0
|
||||
],
|
||||
want_sink = [DmlOperation::Write(op)] => {
|
||||
assert_eq!(op.namespace(), "bananas");
|
||||
assert_eq!(op.namespace_id().get(), 1111);
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -854,8 +860,11 @@ mod tests {
|
|||
non_fatal_stream_offset_error,
|
||||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![vec![
|
||||
Err(WriteBufferError::new(WriteBufferErrorKind::SequenceNumberNoLongerExists, "explosions")),
|
||||
Ok(DmlOperation::Write(make_write("bananas", 31)))
|
||||
Err(WriteBufferError::new(
|
||||
WriteBufferErrorKind::SequenceNumberNoLongerExists,
|
||||
"explosions"
|
||||
)),
|
||||
Ok(DmlOperation::Write(make_write(1111, 31)))
|
||||
]],
|
||||
sink_rets = [Ok(DmlApplyAction::Applied(true))],
|
||||
want_ttbr = 31,
|
||||
|
@ -868,7 +877,7 @@ mod tests {
|
|||
"skipped_sequence_number_amount" => 0
|
||||
],
|
||||
want_sink = [DmlOperation::Write(op)] => {
|
||||
assert_eq!(op.namespace(), "bananas");
|
||||
assert_eq!(op.namespace_id().get(), 1111);
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -884,7 +893,7 @@ mod tests {
|
|||
)
|
||||
)
|
||||
],
|
||||
vec![Ok(DmlOperation::Write(make_write("bananas", 31)))],
|
||||
vec![Ok(DmlOperation::Write(make_write(1111, 31)))],
|
||||
],
|
||||
sink_rets = [Ok(DmlApplyAction::Applied(true))],
|
||||
want_ttbr = 31,
|
||||
|
@ -897,7 +906,7 @@ mod tests {
|
|||
"skipped_sequence_number_amount" => 2
|
||||
],
|
||||
want_sink = [DmlOperation::Write(op)] => {
|
||||
assert_eq!(op.namespace(), "bananas");
|
||||
assert_eq!(op.namespace_id().get(), 1111);
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -906,7 +915,7 @@ mod tests {
|
|||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![vec![
|
||||
Err(WriteBufferError::new(WriteBufferErrorKind::InvalidData, "explosions")),
|
||||
Ok(DmlOperation::Write(make_write("bananas", 50)))
|
||||
Ok(DmlOperation::Write(make_write(1111, 50)))
|
||||
]],
|
||||
sink_rets = [Ok(DmlApplyAction::Applied(true))],
|
||||
want_ttbr = 50,
|
||||
|
@ -919,7 +928,7 @@ mod tests {
|
|||
"skipped_sequence_number_amount" => 0
|
||||
],
|
||||
want_sink = [DmlOperation::Write(op)] => {
|
||||
assert_eq!(op.namespace(), "bananas");
|
||||
assert_eq!(op.namespace_id().get(), 1111);
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -928,7 +937,7 @@ mod tests {
|
|||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![vec![
|
||||
Err(WriteBufferError::new(WriteBufferErrorKind::Unknown, "explosions")),
|
||||
Ok(DmlOperation::Write(make_write("bananas", 60)))
|
||||
Ok(DmlOperation::Write(make_write(1111, 60)))
|
||||
]],
|
||||
sink_rets = [Ok(DmlApplyAction::Applied(true))],
|
||||
want_ttbr = 60,
|
||||
|
@ -941,7 +950,7 @@ mod tests {
|
|||
"skipped_sequence_number_amount" => 0
|
||||
],
|
||||
want_sink = [DmlOperation::Write(op)] => {
|
||||
assert_eq!(op.namespace(), "bananas");
|
||||
assert_eq!(op.namespace_id().get(), 1111);
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -965,12 +974,17 @@ mod tests {
|
|||
reports_last_ttbr,
|
||||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![vec![
|
||||
Ok(DmlOperation::Write(make_write("bananas", 1))),
|
||||
Ok(DmlOperation::Write(make_write("bananas", 2))),
|
||||
Ok(DmlOperation::Write(make_write("bananas", 3))),
|
||||
Ok(DmlOperation::Write(make_write("bananas", 42))),
|
||||
Ok(DmlOperation::Write(make_write(1111, 1))),
|
||||
Ok(DmlOperation::Write(make_write(1111, 2))),
|
||||
Ok(DmlOperation::Write(make_write(1111, 3))),
|
||||
Ok(DmlOperation::Write(make_write(1111, 42))),
|
||||
]],
|
||||
sink_rets = [Ok(DmlApplyAction::Applied(true)), Ok(DmlApplyAction::Applied(false)), Ok(DmlApplyAction::Applied(true)), Ok(DmlApplyAction::Applied(false)),],
|
||||
sink_rets = [
|
||||
Ok(DmlApplyAction::Applied(true)),
|
||||
Ok(DmlApplyAction::Applied(false)),
|
||||
Ok(DmlApplyAction::Applied(true)),
|
||||
Ok(DmlApplyAction::Applied(false)),
|
||||
],
|
||||
want_ttbr = 42,
|
||||
want_reset = 0,
|
||||
want_err_metrics = [
|
||||
|
@ -990,8 +1004,8 @@ mod tests {
|
|||
non_fatal_sink_error,
|
||||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![vec![
|
||||
Ok(DmlOperation::Write(make_write("bad_op", 1))),
|
||||
Ok(DmlOperation::Write(make_write("good_op", 2)))
|
||||
Ok(DmlOperation::Write(make_write(1111, 1))),
|
||||
Ok(DmlOperation::Write(make_write(2222, 2)))
|
||||
]],
|
||||
sink_rets = [
|
||||
Err(crate::data::Error::NamespaceNotFound{namespace: "bananas".to_string() }),
|
||||
|
@ -1010,14 +1024,14 @@ mod tests {
|
|||
DmlOperation::Write(_), // First call into sink is bad_op, returning an error
|
||||
DmlOperation::Write(op), // Second call succeeds
|
||||
] => {
|
||||
assert_eq!(op.namespace(), "good_op");
|
||||
assert_eq!(op.namespace_id().get(), 2222);
|
||||
}
|
||||
);
|
||||
|
||||
test_stream_handler!(
|
||||
skipped_op_no_ttbr,
|
||||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![vec![Ok(DmlOperation::Write(make_write("some_op", 1)))]],
|
||||
stream_ops = vec![vec![Ok(DmlOperation::Write(make_write(1111, 1)))]],
|
||||
sink_rets = [Ok(DmlApplyAction::Skipped)],
|
||||
want_ttbr = 0,
|
||||
want_reset = 0,
|
||||
|
@ -1025,7 +1039,7 @@ mod tests {
|
|||
want_sink = [
|
||||
DmlOperation::Write(op),
|
||||
] => {
|
||||
assert_eq!(op.namespace(), "some_op");
|
||||
assert_eq!(op.namespace_id().get(), 1111);
|
||||
}
|
||||
);
|
||||
|
||||
|
|
|
@ -509,7 +509,7 @@ mod tests {
|
|||
.expect("write should have been successful");
|
||||
assert_matches!(got, DmlOperation::Delete(d) => {
|
||||
assert_eq!(d.table_name(), Some(TABLE));
|
||||
assert_eq!(d.namespace(), &*ns);
|
||||
assert_eq!(d.namespace_id().get(), 42);
|
||||
assert_eq!(*d.predicate(), predicate);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -179,7 +179,6 @@ async fn test_write_ok() {
|
|||
let writes = ctx.write_buffer_state().get_messages(ShardIndex::new(0));
|
||||
assert_eq!(writes.len(), 1);
|
||||
assert_matches!(writes.as_slice(), [Ok(DmlOperation::Write(w))] => {
|
||||
assert_eq!(w.namespace(), "bananas_test");
|
||||
assert!(w.table("platanos").is_some());
|
||||
});
|
||||
|
||||
|
@ -406,7 +405,6 @@ async fn test_write_propagate_ids() {
|
|||
let writes = ctx.write_buffer_state().get_messages(ShardIndex::new(0));
|
||||
assert_eq!(writes.len(), 1);
|
||||
assert_matches!(writes.as_slice(), [Ok(DmlOperation::Write(w))] => {
|
||||
assert_eq!(w.namespace(), "bananas_test");
|
||||
assert_eq!(w.namespace_id(), ns.id);
|
||||
assert!(w.table("platanos").is_some());
|
||||
|
||||
|
@ -458,7 +456,6 @@ async fn test_delete_propagate_ids() {
|
|||
let writes = ctx.write_buffer_state().get_messages(ShardIndex::new(0));
|
||||
assert_eq!(writes.len(), 1);
|
||||
assert_matches!(writes.as_slice(), [Ok(DmlOperation::Delete(w))] => {
|
||||
assert_eq!(w.namespace(), "bananas_test");
|
||||
assert_eq!(w.namespace_id(), ns.id);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -332,13 +332,11 @@ mod tests {
|
|||
)
|
||||
.expect("failed to decode valid wire format");
|
||||
|
||||
assert_eq!("deprecated", got.namespace());
|
||||
let got = match got {
|
||||
DmlOperation::Write(w) => w,
|
||||
_ => panic!("wrong op type"),
|
||||
};
|
||||
|
||||
assert_eq!("deprecated", got.namespace());
|
||||
assert_eq!(w.namespace_id(), got.namespace_id());
|
||||
assert_eq!(w.table_count(), got.table_count());
|
||||
assert_eq!(w.min_timestamp(), got.min_timestamp());
|
||||
|
|
|
@ -311,7 +311,7 @@ mod tests {
|
|||
.try_push(write.clone())
|
||||
.expect("aggregate call should succeed");
|
||||
match res {
|
||||
TryPush::NoCapacity(res) => assert_eq!(res.namespace(), write.namespace()),
|
||||
TryPush::NoCapacity(res) => assert_eq!(res.namespace_id(), write.namespace_id()),
|
||||
TryPush::Aggregated(_) => panic!("expected no capacity"),
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue