fix: don't update last write time on failed writes

Fixes #1905
pull/24376/head
Paul Dix 2021-07-07 14:50:03 -04:00
parent ac1c7a5e07
commit cc350bb1ea
1 changed files with 40 additions and 2 deletions

View File

@ -648,7 +648,6 @@ impl Db {
.get_or_create_partition(table_batch.name(), partition_key);
let mut partition = partition.write();
partition.update_last_write_at();
let (min_time, max_time) =
table_batch.min_max_time().context(TableBatchTimeError)?;
@ -656,7 +655,6 @@ impl Db {
match partition.open_chunk() {
Some(chunk) => {
let mut chunk = chunk.write();
chunk.record_write();
let chunk_id = chunk.id();
let mb_chunk =
@ -674,6 +672,7 @@ impl Db {
}
continue;
};
chunk.record_write();
}
None => {
let metrics = self.metrics_registry.register_domain_with_labels(
@ -701,6 +700,7 @@ impl Db {
partition.create_open_chunk(mb_chunk);
}
};
partition.update_last_write_at();
match partition.persistence_windows() {
Some(windows) => {
@ -1733,6 +1733,44 @@ mod tests {
}
}
#[tokio::test]
async fn failed_write_doesnt_update_last_write_at() {
let db = Arc::new(make_db().await.db);
let before_create = Utc::now();
let partition_key = "1970-01-01T00";
write_lp(&db, "cpu bar=1 10").await;
let after_write = Utc::now();
let (last_write_prev, chunk_last_write_prev) = {
let partition = db.catalog.partition("cpu", partition_key).unwrap();
let partition = partition.read();
assert_ne!(partition.created_at(), partition.last_write_at());
assert!(before_create < partition.last_write_at());
assert!(after_write > partition.last_write_at());
let chunk = partition.open_chunk().unwrap();
let chunk = chunk.read();
(
partition.last_write_at(),
chunk.time_of_last_write().unwrap(),
)
};
let entry = lp_to_entry("cpu bar=true 10");
let result = db.store_entry(entry).await;
assert!(result.is_err());
{
let partition = db.catalog.partition("cpu", partition_key).unwrap();
let partition = partition.read();
assert_eq!(last_write_prev, partition.last_write_at());
let chunk = partition.open_chunk().unwrap();
let chunk = chunk.read();
assert_eq!(chunk_last_write_prev, chunk.time_of_last_write().unwrap());
}
}
#[tokio::test]
async fn write_updates_persistence_windows() {
// Writes should update the persistence windows when there