Merge pull request #8094 from influxdata/savage/individually-sequence-partitions-within-writes

feat(ingester): Assign individual sequence numbers for writes per partition
pull/24376/head
kodiakhq[bot] 2023-07-10 14:39:39 +00:00 committed by GitHub
commit 5521310005
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 336 additions and 110 deletions

View File

@ -99,6 +99,14 @@ impl Extend<SequenceNumber> for SequenceNumberSet {
}
}
impl Extend<SequenceNumberSet> for SequenceNumberSet {
fn extend<T: IntoIterator<Item = SequenceNumberSet>>(&mut self, iter: T) {
for new_set in iter {
self.add_set(&new_set);
}
}
}
impl FromIterator<SequenceNumber> for SequenceNumberSet {
fn from_iter<T: IntoIterator<Item = SequenceNumber>>(iter: T) -> Self {
Self(iter.into_iter().map(|v| v.get() as _).collect())
@ -174,6 +182,29 @@ mod tests {
assert!(a.contains(SequenceNumber::new(2)));
}
#[test]
fn test_extend_multiple_sets() {
let mut a = SequenceNumberSet::default();
a.add(SequenceNumber::new(7));
let b = [SequenceNumber::new(13), SequenceNumber::new(76)];
let c = [SequenceNumber::new(42), SequenceNumber::new(64)];
assert!(a.contains(SequenceNumber::new(7)));
for &num in [b, c].iter().flatten() {
assert!(!a.contains(num));
}
a.extend([
SequenceNumberSet::from_iter(b),
SequenceNumberSet::from_iter(c),
]);
assert!(a.contains(SequenceNumber::new(7)));
for &num in [b, c].iter().flatten() {
assert!(a.contains(num));
}
}
#[test]
fn test_collect() {
let collect_set = [SequenceNumber::new(4), SequenceNumber::new(2)];

View File

@ -1,5 +1,5 @@
//! Tests the `influxdb_iox debug` commands
use std::path::Path;
use std::{path::Path, time::Duration};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
@ -7,6 +7,7 @@ use assert_cmd::Command;
use futures::FutureExt;
use predicates::prelude::*;
use tempfile::TempDir;
use test_helpers::timeout::FutureTimeout;
use test_helpers_end_to_end::{
maybe_skip_integration, run_sql, MiniCluster, ServerFixture, Step, StepTest, StepTestState,
TestConfig,
@ -104,14 +105,20 @@ async fn build_catalog() {
// We can build a catalog and start up the server and run a query
let restarted = RestartedServer::build_catalog_and_start(&table_dir).await;
let batches = restarted.run_sql(sql, &namespace).await;
let batches = run_sql_until_non_empty(&restarted, sql, namespace.as_str())
.with_timeout(Duration::from_secs(2))
.await
.expect("timed out waiting for non-empty batches in result");
assert_batches_sorted_eq!(&expected, &batches);
// We can also rebuild a catalog from just the parquet files
let only_parquet_dir = copy_only_parquet_files(&table_dir);
let restarted =
RestartedServer::build_catalog_and_start(only_parquet_dir.path()).await;
let batches = restarted.run_sql(sql, &namespace).await;
let batches = run_sql_until_non_empty(&restarted, sql, namespace.as_str())
.with_timeout(Duration::from_secs(2))
.await
.expect("timed out waiting for non-empty batches in result");
assert_batches_sorted_eq!(&expected, &batches);
}
.boxed()
@ -122,6 +129,23 @@ async fn build_catalog() {
.await
}
/// Loops forever, running the SQL query against the [`RestartedServer`] given
/// until the result is non-empty. Callers are responsible for timing out the
/// function.
async fn run_sql_until_non_empty(
restarted: &RestartedServer,
sql: &str,
namespace: &str,
) -> Vec<RecordBatch> {
loop {
let batches = restarted.run_sql(sql, namespace).await;
if !batches.is_empty() {
return batches;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
/// An all in one instance, with data directory of `data_dir`
struct RestartedServer {
all_in_one: ServerFixture,

View File

@ -194,26 +194,18 @@ where
for op in ops {
let SequencedWalOp {
table_write_sequence_numbers, // TODO(savage): Use sequence numbers assigned per-partition
table_write_sequence_numbers,
op,
} = op;
let sequence_number = SequenceNumber::new(
*table_write_sequence_numbers
.values()
.next()
.expect("attempt to replay unsequenced wal entry"),
);
max_sequence = max_sequence.max(Some(sequence_number));
let op = match op {
Op::Write(w) => w,
Op::Delete(_) => unreachable!(),
Op::Persist(_) => unreachable!(),
};
debug!(?op, sequence_number = sequence_number.get(), "apply wal op");
let mut op_min_sequence_number = None;
let mut op_max_sequence_number = None;
// Reconstruct the ingest operation
let batches = decode_database_batch(&op)?;
@ -226,10 +218,18 @@ where
.into_iter()
.map(|(k, v)| {
let table_id = TableId::new(k);
let sequence_number = SequenceNumber::new(
*table_write_sequence_numbers
.get(&table_id)
.expect("attempt to apply unsequenced wal op"),
);
max_sequence = max_sequence.max(Some(sequence_number));
op_min_sequence_number = op_min_sequence_number.min(Some(sequence_number));
op_max_sequence_number = op_min_sequence_number.max(Some(sequence_number));
(
table_id,
// TODO(savage): Use table-partitioned sequence
// numbers here
TableData::new(table_id, PartitionedData::new(sequence_number, v)),
)
})
@ -239,6 +239,17 @@ where
None,
);
debug!(
?op,
op_min_sequence_number = op_min_sequence_number
.expect("attempt to apply unsequenced wal op")
.get(),
op_max_sequence_number = op_max_sequence_number
.expect("attempt to apply unsequenced wal op")
.get(),
"apply wal op"
);
// Apply the operation to the provided DML sink
sink.apply(IngestOp::Write(op))
.await
@ -270,9 +281,9 @@ mod tests {
dml_sink::mock_sink::MockDmlSink,
persist::queue::mock::MockPersistQueue,
test_util::{
assert_dml_writes_eq, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID,
ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
ARBITRARY_TABLE_NAME,
assert_write_ops_eq, make_multi_table_write_op, make_write_op, PartitionDataBuilder,
ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY,
ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME,
},
wal::wal_sink::WalSink,
};
@ -300,6 +311,8 @@ mod tests {
}
}
const ALTERNATIVE_TABLE_NAME: &str = "arán";
#[tokio::test]
async fn test_replay() {
let dir = tempfile::tempdir().unwrap();
@ -329,18 +342,30 @@ mod tests {
),
None,
);
let op3 = make_write_op(
// Add a write hitting multiple tables for good measure
let op3 = make_multi_table_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
42,
[
(
ARBITRARY_TABLE_NAME.to_string().as_str(),
ARBITRARY_TABLE_ID,
SequenceNumber::new(42),
),
(
ALTERNATIVE_TABLE_NAME,
TableId::new(ARBITRARY_TABLE_ID.get() + 1),
SequenceNumber::new(43),
),
]
.into_iter(),
// Overwrite op2
&format!(
r#"{},region=Asturias temp=15 4242424242"#,
&*ARBITRARY_TABLE_NAME
r#"{},region=Asturias temp=15 4242424242
{},region=Mayo temp=12 4242424242"#,
&*ARBITRARY_TABLE_NAME, ALTERNATIVE_TABLE_NAME,
),
None,
);
// The write portion of this test.
@ -416,7 +441,7 @@ mod tests {
.await
.expect("failed to replay WAL");
assert_eq!(max_sequence_number, Some(SequenceNumber::new(42)));
assert_eq!(max_sequence_number, Some(SequenceNumber::new(43)));
// Assert the ops were pushed into the DmlSink exactly as generated.
let ops = mock_iter.sink.get_calls();
@ -427,9 +452,9 @@ mod tests {
IngestOp::Write(ref w2),
IngestOp::Write(ref w3)
] => {
assert_dml_writes_eq(w1.clone(), op1);
assert_dml_writes_eq(w2.clone(), op2);
assert_dml_writes_eq(w3.clone(), op3);
assert_write_ops_eq(w1.clone(), op1);
assert_write_ops_eq(w2.clone(), op2);
assert_write_ops_eq(w3.clone(), op3);
}
);

View File

@ -180,20 +180,21 @@ where
"received rpc write"
);
let sequence_number = self.timestamp.next();
// Construct the corresponding ingester write operation for the RPC payload
// Construct the corresponding ingester write operation for the RPC payload,
// independently sequencing the data contained by the write per-partition
let op = WriteOperation::new(
namespace_id,
batches
.into_iter()
.map(|(k, v)| {
let table_id = TableId::new(k);
let partition_sequence_number = self.timestamp.next();
(
table_id,
// TODO(savage): Sequence partitioned data independently within a
// write.
TableData::new(table_id, PartitionedData::new(sequence_number, v)),
TableData::new(
table_id,
PartitionedData::new(partition_sequence_number, v),
),
)
})
.collect(),
@ -226,17 +227,19 @@ mod tests {
column::{SemanticType, Values},
Column, DatabaseBatch, TableBatch,
};
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use super::*;
use crate::{
dml_payload::IngestOp,
dml_sink::mock_sink::MockDmlSink,
test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID},
test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID},
};
use crate::{dml_sink::mock_sink::MockDmlSink, test_util::ARBITRARY_PARTITION_KEY};
const PERSIST_QUEUE_DEPTH: usize = 42;
const ALTERNATIVE_TABLE_ID: TableId = TableId::new(76);
macro_rules! test_rpc_write {
(
$name:ident,
@ -315,6 +318,75 @@ mod tests {
}
);
test_rpc_write!(
apply_ok_independently_sequenced_partitions,
request = proto::WriteRequest {
payload: Some(DatabaseBatch {
database_id: ARBITRARY_NAMESPACE_ID.get(),
partition_key: ARBITRARY_PARTITION_KEY.to_string(),
table_batches: vec![
TableBatch {
table_id: ARBITRARY_TABLE_ID.get(),
columns: vec![Column {
column_name: String::from("time"),
semantic_type: SemanticType::Time.into(),
values: Some(Values {
i64_values: vec![4242],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![0],
}],
row_count:1 ,
},
TableBatch {
table_id: ALTERNATIVE_TABLE_ID.get(),
columns: vec![Column {
column_name: String::from("time"),
semantic_type: SemanticType::Time.into(),
values: Some(Values {
i64_values: vec![7676],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![0],
}],
row_count: 1,
},
],
}),
},
sink_ret = Ok(()),
want_err = false,
want_calls = [IngestOp::Write(w)] => {
// Assert the properties of the applied IngestOp match the expected
// values. Notably a sequence number should be assigned _per partition_.
assert_eq!(w.namespace(), ARBITRARY_NAMESPACE_ID);
assert_eq!(w.tables().count(), 2);
assert_eq!(*w.partition_key(), *ARBITRARY_PARTITION_KEY);
let sequence_numbers = w.tables().map(|t| t.1.partitioned_data().sequence_number()).collect::<HashSet<_>>();
assert_eq!(
sequence_numbers,
[
SequenceNumber::new(1),
SequenceNumber::new(2),
]
.into_iter()
.collect::<HashSet<_>>(),
);
}
);
test_rpc_write!(
no_payload,
request = proto::WriteRequest { payload: None },

View File

@ -4,6 +4,7 @@ use data_types::{
partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionId, PartitionKey,
SequenceNumber, TableId,
};
use hashbrown::HashSet;
use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace};
use lazy_static::lazy_static;
use mutable_batch_lp::lines_to_batches;
@ -312,6 +313,52 @@ pub(crate) fn make_write_op(
WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), span_ctx)
}
/// Construct a [`WriteOperation`] with the specified parameters for LP covering
/// multiple separately sequenced table writes.
///
/// # Panics
///
/// This method panics if `table_sequence_numbers` contains a different number
/// of tables to the batches derived from `lines` OR if a [`SequenceNumber`]
/// is re-used within the write.
#[track_caller]
pub(crate) fn make_multi_table_write_op<
'a,
I: ExactSizeIterator<Item = (&'a str, TableId, SequenceNumber)>,
>(
partition_key: &PartitionKey,
namespace_id: NamespaceId,
table_sequence_numbers: I,
lines: &str,
) -> WriteOperation {
let mut tables_by_name = lines_to_batches(lines, 0).expect("invalid LP");
assert_eq!(
tables_by_name.len(),
table_sequence_numbers.len(),
"number of tables in LP does not match number of table_sequence_numbers"
);
let mut seen_sequence_numbers = HashSet::<SequenceNumber>::new();
let tables_by_id = table_sequence_numbers
.map(|(table_name, table_id, sequence_number)| {
let mb = tables_by_name
.remove(table_name)
.expect("table name does not exist in LP");
assert!(
seen_sequence_numbers.insert(sequence_number),
"duplicate sequence number {sequence_number:?} observed"
);
(
table_id,
TableData::new(table_id, PartitionedData::new(sequence_number, mb)),
)
})
.collect();
WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), None)
}
pub(crate) async fn populate_catalog(
catalog: &dyn Catalog,
namespace: &str,
@ -332,17 +379,18 @@ pub(crate) async fn populate_catalog(
/// Assert `a` and `b` have identical metadata, and that when converting
/// them to Arrow batches they produces identical output.
#[track_caller]
pub(crate) fn assert_dml_writes_eq(a: WriteOperation, b: WriteOperation) {
pub(crate) fn assert_write_ops_eq(a: WriteOperation, b: WriteOperation) {
assert_eq!(a.namespace(), b.namespace(), "namespace");
assert_eq!(a.tables().count(), b.tables().count(), "table count");
assert_eq!(a.partition_key(), b.partition_key(), "partition key");
// Assert sequence numbers were reassigned
for (a_table, b_table) in a.tables().zip(b.tables()) {
assert_eq!(a_table.0, b_table.0, "table id mismatch");
assert_eq!(
a_table.1.partitioned_data().sequence_number(),
b_table.1.partitioned_data().sequence_number(),
"sequence number"
"sequence number mismatch"
);
}

View File

@ -696,22 +696,25 @@ mod tests {
let wal = Wal::new(&dir.path()).await.unwrap();
let w1 = test_data("m1,t=foo v=1i 1");
let w2 = test_data("m1,t=foo v=2i 2");
// Use multiple tables for a write to test per-partition sequencing is preserved
let w2 = test_data("m1,t=foo v=2i 2\nm2,u=bar v=1i 1");
let op1 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(),
op: WalOp::Write(w1),
};
let op2 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 1), (TableId::new(1), 2)]
.into_iter()
.collect(),
op: WalOp::Write(w2),
};
let op3 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Persist(test_persist()),
};
@ -720,7 +723,6 @@ mod tests {
wal.write_op(op3.clone());
wal.write_op(op4.clone()).changed().await.unwrap();
// TODO(savage): Returned SequenceNumberSet should reflect `partition_sequence_numbers` post-change.
let (closed, ids) = wal.rotate().unwrap();
let ops: Vec<SequencedWalOp> = wal
@ -733,7 +735,7 @@ mod tests {
// Assert the set has recorded the op IDs.
//
// Note that one op has a duplicate sequence number above!
assert_eq!(ids.len(), 3);
assert_eq!(ids.len(), 4);
// Assert the sequence number set contains the specified ops.
let ids = ids.iter().collect::<Vec<_>>();
@ -743,6 +745,7 @@ mod tests {
SequenceNumber::new(0),
SequenceNumber::new(1),
SequenceNumber::new(2),
SequenceNumber::new(3),
]
);
@ -753,9 +756,11 @@ mod tests {
.collect::<Vec<std::collections::HashMap<TableId, u64>>>(),
[
[(TableId::new(0), 0)].into_iter().collect(),
[(TableId::new(0), 1)].into_iter().collect(),
[(TableId::new(0), 2)].into_iter().collect(),
[(TableId::new(0), 2)].into_iter().collect(),
[(TableId::new(0), 1), (TableId::new(1), 2)]
.into_iter()
.collect(),
[(TableId::new(0), 3)].into_iter().collect(),
[(TableId::new(0), 3)].into_iter().collect(),
]
.into_iter()
.collect::<Vec<std::collections::HashMap<TableId, u64>>>(),
@ -808,7 +813,7 @@ mod tests {
let wal = Wal::new(dir.path()).await.unwrap();
let w1 = test_data("m1,t=foo v=1i 1");
let w2 = test_data("m2,u=foo w=2i 2");
let w2 = test_data("m1,t=foo v=2i 2\nm2,u=foo w=2i 2");
let w3 = test_data("m1,t=foo v=3i 3");
let op1 = SequencedWalOp {
@ -816,20 +821,22 @@ mod tests {
op: WalOp::Write(w1.to_owned()),
};
let op2 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 1), (TableId::new(1), 2)]
.into_iter()
.collect(),
op: WalOp::Write(w2.to_owned()),
};
let op3 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Persist(test_persist()),
};
// A third write entry coming after a delete and persist entry must still be yielded
let op5 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 4)].into_iter().collect(),
op: WalOp::Write(w3.to_owned()),
};

View File

@ -122,17 +122,12 @@ impl WriterIoThread {
.ops
.into_iter()
.map(|v| {
// TODO(savage): Extract all [`SequenceNumber`] used for
// the op and include them in the batch once the tables
// sequenced independently.
let id = SequenceNumber::new(
*v.table_write_sequence_numbers
.values()
.next()
.expect("attempt to encode unsequence wal operation")
as _,
);
(proto::SequencedWalOp::from(v), id)
let op_ids: SequenceNumberSet = v
.table_write_sequence_numbers
.values()
.map(|&id| SequenceNumber::new(id as _))
.collect();
(proto::SequencedWalOp::from(v), op_ids)
})
.unzip();
let proto_batch = proto::WalOpBatch { ops };

View File

@ -22,16 +22,16 @@ async fn crud() {
);
// Can write an entry to the open segment
let op = arbitrary_sequenced_wal_op(42);
let op = arbitrary_sequenced_wal_op([42, 43]);
let summary = unwrap_summary(wal.write_op(op)).await;
assert_eq!(summary.total_bytes, 126);
assert_eq!(summary.bytes_written, 110);
assert_eq!(summary.total_bytes, 140);
assert_eq!(summary.bytes_written, 124);
// Can write another entry; total_bytes accumulates
let op = arbitrary_sequenced_wal_op(43);
let op = arbitrary_sequenced_wal_op([44, 45]);
let summary = unwrap_summary(wal.write_op(op)).await;
assert_eq!(summary.total_bytes, 236);
assert_eq!(summary.bytes_written, 110);
assert_eq!(summary.total_bytes, 264);
assert_eq!(summary.bytes_written, 124);
// Still no closed segments
let closed = wal.closed_segments();
@ -42,10 +42,15 @@ async fn crud() {
// Can't read entries from the open segment; have to rotate first
let (closed_segment_details, ids) = wal.rotate().unwrap();
assert_eq!(closed_segment_details.size(), 236);
assert_eq!(closed_segment_details.size(), 264);
assert_eq!(
ids.iter().collect::<Vec<_>>(),
[SequenceNumber::new(42), SequenceNumber::new(43)]
[
SequenceNumber::new(42),
SequenceNumber::new(43),
SequenceNumber::new(44),
SequenceNumber::new(45)
]
);
// There's one closed segment
@ -53,20 +58,25 @@ async fn crud() {
let closed_segment_ids: Vec<_> = closed.iter().map(|c| c.id()).collect();
assert_eq!(closed_segment_ids, &[closed_segment_details.id()]);
// Can read the written entries from the closed segment,
// ensuring the per-partition sequence numbers match up to the current
// op-level sequence number while it is the source of truth.
// Can read the written entries from the closed segment, ensuring that the
// per-partition sequence numbers are preserved.
let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap();
let op = reader.next().unwrap().unwrap();
op[0]
let mut op = reader.next().unwrap().unwrap();
let mut got_sequence_numbers = op
.remove(0)
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, 42));
let op = reader.next().unwrap().unwrap();
op[0]
.into_values()
.collect::<Vec<_>>();
got_sequence_numbers.sort();
assert_eq!(got_sequence_numbers, Vec::<u64>::from([42, 43]),);
let mut op = reader.next().unwrap().unwrap();
let mut got_sequence_numbers = op
.remove(0)
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, 43));
.into_values()
.collect::<Vec<_>>();
got_sequence_numbers.sort();
assert_eq!(got_sequence_numbers, Vec::<u64>::from([44, 45]),);
// Can delete a segment, leaving no closed segments again
wal.delete(closed_segment_details.id()).await.unwrap();
@ -85,10 +95,10 @@ async fn replay() {
// WAL.
{
let wal = wal::Wal::new(dir.path()).await.unwrap();
let op = arbitrary_sequenced_wal_op(42);
let op = arbitrary_sequenced_wal_op([42]);
let _ = unwrap_summary(wal.write_op(op)).await;
wal.rotate().unwrap();
let op = arbitrary_sequenced_wal_op(43);
let op = arbitrary_sequenced_wal_op([43, 44]);
let _ = unwrap_summary(wal.write_op(op)).await;
}
@ -102,22 +112,27 @@ async fn replay() {
assert_eq!(closed_segment_ids.len(), 2);
// Can read the written entries from the previously closed segment
// ensuring the per-partition sequence numbers match up to the current
// op-level sequence number while it is the source of truth.
// ensuring the per-partition sequence numbers are preserved.
let mut reader = wal.reader_for_segment(closed_segment_ids[0]).unwrap();
let op = reader.next().unwrap().unwrap();
op[0]
let mut op = reader.next().unwrap().unwrap();
let mut got_sequence_numbers = op
.remove(0)
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, 42));
.into_values()
.collect::<Vec<_>>();
got_sequence_numbers.sort();
assert_eq!(got_sequence_numbers, Vec::<u64>::from([42]));
// Can read the written entries from the previously open segment
let mut reader = wal.reader_for_segment(closed_segment_ids[1]).unwrap();
let op = reader.next().unwrap().unwrap();
op[0]
let mut op = reader.next().unwrap().unwrap();
let mut got_sequence_numbers = op
.remove(0)
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, 43));
.into_values()
.collect::<Vec<_>>();
got_sequence_numbers.sort();
assert_eq!(got_sequence_numbers, Vec::<u64>::from([43, 44]));
}
#[tokio::test]
@ -128,19 +143,20 @@ async fn ordering() {
{
let wal = wal::Wal::new(dir.path()).await.unwrap();
let op = arbitrary_sequenced_wal_op(42);
let _ = unwrap_summary(wal.write_op(op)).await;
// TODO(savage): These will need to return the
// partition_sequence_numbers and be checked there.
let (_, ids) = wal.rotate().unwrap();
assert_eq!(ids.iter().collect::<Vec<_>>(), [SequenceNumber::new(42)]);
let op = arbitrary_sequenced_wal_op(43);
let op = arbitrary_sequenced_wal_op([42, 43]);
let _ = unwrap_summary(wal.write_op(op)).await;
let (_, ids) = wal.rotate().unwrap();
assert_eq!(ids.iter().collect::<Vec<_>>(), [SequenceNumber::new(43)]);
assert_eq!(
ids.iter().collect::<Vec<_>>(),
[SequenceNumber::new(42), SequenceNumber::new(43)]
);
let op = arbitrary_sequenced_wal_op(44);
let op = arbitrary_sequenced_wal_op([44]);
let _ = unwrap_summary(wal.write_op(op)).await;
let (_, ids) = wal.rotate().unwrap();
assert_eq!(ids.iter().collect::<Vec<_>>(), [SequenceNumber::new(44)]);
let op = arbitrary_sequenced_wal_op([45]);
let _ = unwrap_summary(wal.write_op(op)).await;
}
@ -164,13 +180,21 @@ async fn ordering() {
assert!(ids.is_empty());
}
fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {
let w = test_data("m1,t=foo v=1i 1");
fn arbitrary_sequenced_wal_op<I: IntoIterator<Item = u64>>(sequence_numbers: I) -> SequencedWalOp {
let sequence_numbers = sequence_numbers.into_iter().collect::<Vec<_>>();
let lp = sequence_numbers
.iter()
.enumerate()
.fold(String::new(), |string, (idx, _)| {
string + &format!("m{},t=foo v=1i 1\n", idx)
});
let w = test_data(lp.as_str());
SequencedWalOp {
table_write_sequence_numbers: w
.table_batches
.iter()
.map(|table_batch| (TableId::new(table_batch.table_id), sequence_number))
.zip(sequence_numbers.iter())
.map(|(table_batch, &id)| (TableId::new(table_batch.table_id), id))
.collect(),
op: WalOp::Write(w),
}