Merge branch 'main' into dom/remove-comment

pull/24376/head
kodiakhq[bot] 2022-12-02 17:58:33 +00:00 committed by GitHub
commit 228c81c6fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 9 deletions

View File

@ -65,6 +65,10 @@ where
// enforced within the buffer.
let mut max_sequence = None;
for (index, file) in files.into_iter().enumerate() {
// Map 0-based iter index to 1 based file count
let file_number = index + 1;
// Read the segment
let reader = read_handle
.reader_for_segment(file.id())
.await
@ -73,7 +77,7 @@ where
// Emit a log entry so progress can be tracked (and a problematic file
// be identified should an explosion happen during replay).
info!(
file_number = index + 1, // map 0-based index to 1 based file count
file_number,
n_files,
file_id = %file.id(),
size = file.size(),
@ -81,8 +85,34 @@ where
);
// Replay this segment file
let seq = replay_file(reader, sink).await?;
max_sequence = max_sequence.max(seq);
match replay_file(reader, sink).await? {
v @ Some(_) => max_sequence = max_sequence.max(v),
None => {
// This file was empty and should be deleted.
warn!(
file_number,
n_files,
file_id = %file.id(),
size = file.size(),
"dropping empty wal segment",
);
// TODO(dom:test): empty WAL replay
// A failure to delete an empty file should not prevent WAL
// replay from continuing.
if let Err(error) = wal.rotation_handle().delete(file.id()).await {
error!(
file_number,
n_files,
file_id = %file.id(),
size = file.size(),
%error,
"error dropping empty wal segment",
);
}
}
};
}
info!(
@ -94,7 +124,7 @@ where
}
/// Replay the entries in `file`, applying them to `buffer`. Returns the highest
/// sequence number observed in the file, if any (files may be empty).
/// sequence number observed in the file, or [`None`] if the file was empty.
async fn replay_file<T>(
mut file: wal::ClosedSegmentFileReader,
sink: &T,

View File

@ -14,6 +14,7 @@ use schema::Projection;
/// # Panics
///
/// This method panics if `lines` contains data for more than one table.
#[track_caller]
pub(crate) fn make_write_op(
partition_key: &PartitionKey,
namespace_id: NamespaceId,
@ -22,12 +23,22 @@ pub(crate) fn make_write_op(
sequence_number: i64,
lines: &str,
) -> DmlWrite {
let mut tables_by_name = lines_to_batches(lines, 0).unwrap();
assert_eq!(tables_by_name.len(), 1);
let mut tables_by_name = lines_to_batches(lines, 0).expect("invalid LP");
assert_eq!(
tables_by_name.len(),
1,
"make_write_op only supports 1 table in the LP"
);
let tables_by_id = [(
table_id,
tables_by_name
.remove(table_name)
.expect("table_name does not exist in LP"),
)]
.into_iter()
.collect();
let tables_by_id = [(table_id, tables_by_name.remove(table_name).unwrap())]
.into_iter()
.collect();
DmlWrite::new(
namespace_id,
tables_by_id,