fix: empty writes do not corrupt wal

hiltontj/empty-write-fix
Trevor Hilton 2025-04-05 15:04:18 -04:00
parent 6994cd2141
commit 4589989a23
2 changed files with 56 additions and 19 deletions

View File

@ -340,6 +340,10 @@ impl IntoResponse for Error {
.body(body)
.unwrap()
}
Self::WriteBuffer(err @ WriteBufferError::EmptyWrite) => Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(err.to_string()))
.unwrap(),
Self::WriteBuffer(err @ WriteBufferError::ColumnDoesNotExist(_)) => {
let err: ErrorMessage<()> = ErrorMessage {
error: err.to_string(),

View File

@ -63,6 +63,9 @@ pub enum Error {
#[error("parsing for line protocol failed")]
ParseError(WriteLineError),
#[error("incoming write was empty")]
EmptyWrite,
#[error("column type mismatch for column {name}: existing: {existing:?}, new: {new:?}")]
ColumnTypeMismatch {
name: String,
@ -298,17 +301,25 @@ impl WriteBufferImpl {
}
};
let ops = vec![WalOp::Write(result.valid_data)];
// Only buffer to the WAL if there are actually writes in the batch; it
// is possible to get empty writes with `accept_partial`:
if result.line_count > 0 {
let ops = vec![WalOp::Write(result.valid_data)];
if no_sync {
self.wal.write_ops_unconfirmed(ops).await?;
} else {
// write to the wal. Behind the scenes the ops get buffered in memory and once a second (or
// whatever the configured wal flush interval is set to) the buffer is flushed and all the
// data is persisted into a single wal file in the configured object store. Then the
// contents are sent to the configured notifier, which in this case is the queryable buffer.
// Thus, after this returns, the data is both durable and queryable.
self.wal.write_ops(ops).await?;
if no_sync {
self.wal.write_ops_unconfirmed(ops).await?;
} else {
// write to the wal. Behind the scenes the ops get buffered in memory and once a second (or
// whatever the configured wal flush interval is set to) the buffer is flushed and all the
// data is persisted into a single wal file in the configured object store. Then the
// contents are sent to the configured notifier, which in this case is the queryable buffer.
// Thus, after this returns, the data is both durable and queryable.
self.wal.write_ops(ops).await?;
}
}
if result.line_count == 0 && result.errors.is_empty() {
return Err(Error::EmptyWrite);
}
// record metrics for lines written, rejected, and bytes written
@ -2955,15 +2966,37 @@ mod tests {
let buf = init().await;
do_writes_partial(
"cats",
buf.as_ref(),
&[TestWrite {
lp: "",
time_seconds: 1_000,
}],
)
.await;
// empty write should be rejected:
let err = buf
.write_lp(
NamespaceName::new("cats").unwrap(),
"",
Time::from_timestamp_nanos(1),
true,
Precision::Nanosecond,
false,
)
.await
.unwrap_err();
assert!(
matches!(err, Error::EmptyWrite),
"should get an empty write error"
);
// do a write with only invalid lines:
let res = buf
.write_lp(
NamespaceName::new("cats").unwrap(),
"not_valid_line_protocol",
Time::from_timestamp_nanos(1),
true,
Precision::Nanosecond,
false,
)
.await
.unwrap();
assert_eq!(0, res.line_count);
assert_eq!(1, res.invalid_lines.len());
drop(buf);