From 825c19d726d3c0c104d39777d6e7d00a33bece49 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 16 Aug 2021 13:21:30 +0200 Subject: [PATCH] fix: disallow dropping unpersted chunks from persisted DB It doesn't play well w/ replay at the moment since we would forget which sequence numbers we've already seen. Fixes #2291. --- server/src/db.rs | 33 ++++++++++++++++++++++++++++++++ server/src/db/lifecycle/drop.rs | 12 +++++++++++- server/src/db/lifecycle/error.rs | 3 +++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/server/src/db.rs b/server/src/db.rs index f88da1f5e0..f7a8e633d8 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -4218,6 +4218,39 @@ mod tests { ); } + #[tokio::test] + async fn drop_unpersisted_chunk_on_persisted_db() { + // We don't support dropping unpersisted chunks from a persisted DB because we would forget the write buffer + // progress (partition checkpoints are only created when new parquet files are stored). + // See https://github.com/influxdata/influxdb_iox/issues/2291 + let test_db = TestDb::builder() + .lifecycle_rules(LifecycleRules { + persist: true, + ..Default::default() + }) + .build() + .await; + let db = Arc::new(test_db.db); + + write_lp(db.as_ref(), "cpu bar=1 10").await; + + let partition_key = "1970-01-01T00"; + let chunks = db.partition_chunk_summaries(partition_key); + assert_eq!(chunks.len(), 1); + let chunk_id = chunks[0].id; + + let err = db + .drop_chunk("cpu", partition_key, chunk_id) + .await + .unwrap_err(); + assert!(matches!( + err, + Error::LifecycleError { + source: super::lifecycle::Error::CannotDropUnpersistedChunk { .. } + } + )); + } + async fn create_parquet_chunk(db: &Arc) -> (String, String, u32) { write_lp(db, "cpu bar=1 10").await; let partition_key = "1970-01-01T00"; diff --git a/server/src/db/lifecycle/drop.rs b/server/src/db/lifecycle/drop.rs index ad737ec911..d2710cb34b 100644 --- a/server/src/db/lifecycle/drop.rs +++ b/server/src/db/lifecycle/drop.rs @@ -9,7 +9,7 @@ use snafu::ResultExt; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; use super::{ - error::{CommitError, Result}, + error::{CannotDropUnpersistedChunk, CommitError, Result}, LockableCatalogChunk, LockableCatalogPartition, }; use crate::db::catalog::{ @@ -34,6 +34,16 @@ pub fn drop_chunk( chunk: guard.addr().clone(), }); + // check if we're dropping an unpersisted chunk in a persisted DB + // See https://github.com/influxdata/influxdb_iox/issues/2291 + if db.rules().lifecycle_rules.persist && !matches!(guard.stage(), ChunkStage::Persisted { .. }) + { + return CannotDropUnpersistedChunk { + addr: guard.addr().clone(), + } + .fail(); + } + guard.set_dropping(®istration)?; // Drop locks diff --git a/server/src/db/lifecycle/error.rs b/server/src/db/lifecycle/error.rs index 6fb416f2ac..24e8227789 100644 --- a/server/src/db/lifecycle/error.rs +++ b/server/src/db/lifecycle/error.rs @@ -59,6 +59,9 @@ pub enum Error { #[snafu(display("Cannot write chunk: {}", addr))] CannotWriteChunk { addr: ChunkAddr }, + + #[snafu(display("Cannot drop unpersisted chunk: {}", addr))] + CannotDropUnpersistedChunk { addr: ChunkAddr }, } pub type Result = std::result::Result;