refactor(ingester): Use multi-table write op test util for wal_sink test
parent
9ca0abfe0d
commit
b4a5d994d7
|
@ -181,12 +181,10 @@ mod tests {
|
|||
use assert_matches::assert_matches;
|
||||
use data_types::{SequenceNumber, TableId};
|
||||
use lazy_static::lazy_static;
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use wal::Wal;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
dml_payload::write::{PartitionedData, TableData, WriteOperation},
|
||||
dml_sink::mock_sink::MockDmlSink,
|
||||
test_util::{
|
||||
make_multi_table_write_op, ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY,
|
||||
|
@ -196,57 +194,36 @@ mod tests {
|
|||
|
||||
lazy_static! {
|
||||
static ref ALTERNATIVE_TABLE_NAME: &'static str = "arán";
|
||||
static ref ALTERNATIVE_TABLE_ID: TableId = TableId::new(ARBITRARY_TABLE_ID.get() + 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_append() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
||||
const ALTERNATIVE_TABLE_ID: TableId = TableId::new(45);
|
||||
// Generate a test op containing writes for multiple tables that will
|
||||
// be appended and read back
|
||||
let mut tables_by_name = lines_to_batches(
|
||||
format!(
|
||||
"{},region=Madrid temp=35 4242424242\n\
|
||||
{},region=Galway temp=25 7676767676",
|
||||
&*ARBITRARY_TABLE_NAME, &*ALTERNATIVE_TABLE_NAME
|
||||
)
|
||||
.as_str(),
|
||||
0,
|
||||
)
|
||||
.expect("invalid line proto");
|
||||
let op = WriteOperation::new(
|
||||
let op = make_multi_table_write_op(
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
ARBITRARY_NAMESPACE_ID,
|
||||
[
|
||||
(
|
||||
ARBITRARY_TABLE_NAME.to_string().as_str(),
|
||||
ARBITRARY_TABLE_ID,
|
||||
TableData::new(
|
||||
ARBITRARY_TABLE_ID,
|
||||
PartitionedData::new(
|
||||
SequenceNumber::new(42),
|
||||
tables_by_name
|
||||
.remove(&ARBITRARY_TABLE_NAME.to_string())
|
||||
.expect("table does not exist in LP"),
|
||||
),
|
||||
),
|
||||
SequenceNumber::new(42),
|
||||
),
|
||||
(
|
||||
ALTERNATIVE_TABLE_ID,
|
||||
TableData::new(
|
||||
ALTERNATIVE_TABLE_ID,
|
||||
PartitionedData::new(
|
||||
SequenceNumber::new(43),
|
||||
tables_by_name
|
||||
.remove(&ALTERNATIVE_TABLE_NAME.to_string())
|
||||
.expect("alternative table does not exist in LP"),
|
||||
),
|
||||
),
|
||||
&ALTERNATIVE_TABLE_NAME,
|
||||
*ALTERNATIVE_TABLE_ID,
|
||||
SequenceNumber::new(43),
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
ARBITRARY_PARTITION_KEY.clone(),
|
||||
None,
|
||||
.into_iter(),
|
||||
&format!(
|
||||
r#"{},region=Madrid temp=35,climate="dry" 4242424242
|
||||
{},region=Belfast temp=14,climate="wet" 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME, &*ALTERNATIVE_TABLE_NAME,
|
||||
),
|
||||
);
|
||||
|
||||
// The write portion of this test.
|
||||
|
@ -291,7 +268,7 @@ mod tests {
|
|||
let read_op = assert_matches!(&*ops, [op] => op, "expected 1 DML operation");
|
||||
assert_eq!(
|
||||
read_op.table_write_sequence_numbers,
|
||||
[(ARBITRARY_TABLE_ID, 42), (ALTERNATIVE_TABLE_ID, 43)]
|
||||
[(ARBITRARY_TABLE_ID, 42), (*ALTERNATIVE_TABLE_ID, 43)]
|
||||
.into_iter()
|
||||
.collect::<std::collections::HashMap<TableId, u64>>()
|
||||
);
|
||||
|
@ -340,7 +317,7 @@ mod tests {
|
|||
),
|
||||
(
|
||||
&ALTERNATIVE_TABLE_NAME,
|
||||
TableId::new(ARBITRARY_TABLE_ID.get() + 1),
|
||||
*ALTERNATIVE_TABLE_ID,
|
||||
SequenceNumber::new(43),
|
||||
),
|
||||
]
|
||||
|
|
Loading…
Reference in New Issue