diff --git a/ingester/src/wal/wal_sink.rs b/ingester/src/wal/wal_sink.rs index 2cc65a1ccb..a1f7a1b4dd 100644 --- a/ingester/src/wal/wal_sink.rs +++ b/ingester/src/wal/wal_sink.rs @@ -181,12 +181,10 @@ mod tests { use assert_matches::assert_matches; use data_types::{SequenceNumber, TableId}; use lazy_static::lazy_static; - use mutable_batch_lp::lines_to_batches; use wal::Wal; use super::*; use crate::{ - dml_payload::write::{PartitionedData, TableData, WriteOperation}, dml_sink::mock_sink::MockDmlSink, test_util::{ make_multi_table_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, @@ -196,57 +194,36 @@ mod tests { lazy_static! { static ref ALTERNATIVE_TABLE_NAME: &'static str = "arĂ¡n"; + static ref ALTERNATIVE_TABLE_ID: TableId = TableId::new(ARBITRARY_TABLE_ID.get() + 1); } #[tokio::test] async fn test_append() { let dir = tempfile::tempdir().unwrap(); - const ALTERNATIVE_TABLE_ID: TableId = TableId::new(45); // Generate a test op containing writes for multiple tables that will // be appended and read back - let mut tables_by_name = lines_to_batches( - format!( - "{},region=Madrid temp=35 4242424242\n\ - {},region=Galway temp=25 7676767676", - &*ARBITRARY_TABLE_NAME, &*ALTERNATIVE_TABLE_NAME - ) - .as_str(), - 0, - ) - .expect("invalid line proto"); - let op = WriteOperation::new( + let op = make_multi_table_write_op( + &ARBITRARY_PARTITION_KEY, ARBITRARY_NAMESPACE_ID, [ ( + ARBITRARY_TABLE_NAME.to_string().as_str(), ARBITRARY_TABLE_ID, - TableData::new( - ARBITRARY_TABLE_ID, - PartitionedData::new( - SequenceNumber::new(42), - tables_by_name - .remove(&ARBITRARY_TABLE_NAME.to_string()) - .expect("table does not exist in LP"), - ), - ), + SequenceNumber::new(42), ), ( - ALTERNATIVE_TABLE_ID, - TableData::new( - ALTERNATIVE_TABLE_ID, - PartitionedData::new( - SequenceNumber::new(43), - tables_by_name - .remove(&ALTERNATIVE_TABLE_NAME.to_string()) - .expect("alternative table does not exist in LP"), - ), - ), + &ALTERNATIVE_TABLE_NAME, + *ALTERNATIVE_TABLE_ID, + SequenceNumber::new(43), ), ] - .into_iter() - .collect(), - ARBITRARY_PARTITION_KEY.clone(), - None, + .into_iter(), + &format!( + r#"{},region=Madrid temp=35,climate="dry" 4242424242 + {},region=Belfast temp=14,climate="wet" 4242424242"#, + &*ARBITRARY_TABLE_NAME, &*ALTERNATIVE_TABLE_NAME, + ), ); // The write portion of this test. @@ -291,7 +268,7 @@ mod tests { let read_op = assert_matches!(&*ops, [op] => op, "expected 1 DML operation"); assert_eq!( read_op.table_write_sequence_numbers, - [(ARBITRARY_TABLE_ID, 42), (ALTERNATIVE_TABLE_ID, 43)] + [(ARBITRARY_TABLE_ID, 42), (*ALTERNATIVE_TABLE_ID, 43)] .into_iter() .collect::>() ); @@ -340,7 +317,7 @@ mod tests { ), ( &ALTERNATIVE_TABLE_NAME, - TableId::new(ARBITRARY_TABLE_ID.get() + 1), + *ALTERNATIVE_TABLE_ID, SequenceNumber::new(43), ), ]