diff --git a/dml/src/lib.rs b/dml/src/lib.rs index d08e49a199..1b8f08a27e 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -388,9 +388,18 @@ pub mod test_util { /// Asserts two writes are equal pub fn assert_writes_eq(a: &DmlWrite, b: &DmlWrite) { assert_eq!(a.namespace, b.namespace); - assert_eq!(a.meta(), b.meta()); assert_eq!(a.partition_key(), b.partition_key()); + // Depending on what implementation is under test ( :( ) different + // timestamp precisions may be used. + // + // Truncate them all to milliseconds (the lowest common denominator) so + // they are comparable. + assert_eq!( + truncate_timestamp_to_millis(a.meta()), + truncate_timestamp_to_millis(b.meta()) + ); + assert_eq!(a.table_count(), b.table_count()); for (table_name, a_batch) in a.tables() { @@ -412,4 +421,24 @@ pub mod test_util { _ => panic!("unexpected operation: {:?}", a), } } + + fn truncate_timestamp_to_millis(m: &DmlMeta) -> DmlMeta { + // Kafka supports millisecond precision in timestamps, so drop some + // precision from this producer timestamp in the metadata (which has + // nanosecond precision) to ensure the returned write is directly + // comparable to a write that has come through the write buffer. + // + // This mangling is to support testing comparisons only. + let timestamp = m + .producer_ts() + .expect("no producer timestamp in de-aggregated metadata"); + let timestamp = Time::from_timestamp_millis(timestamp.timestamp_millis()); + + DmlMeta::sequenced( + *m.sequence().unwrap(), + timestamp, + m.span_context().cloned(), + m.bytes_read().unwrap(), + ) + } }