fix: disallow dropping unpersted chunks from persisted DB
It doesn't play well w/ replay at the moment since we would forget which sequence numbers we've already seen. Fixes #2291.pull/24376/head
parent
756f5c6699
commit
825c19d726
|
@ -4218,6 +4218,39 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn drop_unpersisted_chunk_on_persisted_db() {
|
||||
// We don't support dropping unpersisted chunks from a persisted DB because we would forget the write buffer
|
||||
// progress (partition checkpoints are only created when new parquet files are stored).
|
||||
// See https://github.com/influxdata/influxdb_iox/issues/2291
|
||||
let test_db = TestDb::builder()
|
||||
.lifecycle_rules(LifecycleRules {
|
||||
persist: true,
|
||||
..Default::default()
|
||||
})
|
||||
.build()
|
||||
.await;
|
||||
let db = Arc::new(test_db.db);
|
||||
|
||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let chunks = db.partition_chunk_summaries(partition_key);
|
||||
assert_eq!(chunks.len(), 1);
|
||||
let chunk_id = chunks[0].id;
|
||||
|
||||
let err = db
|
||||
.drop_chunk("cpu", partition_key, chunk_id)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
Error::LifecycleError {
|
||||
source: super::lifecycle::Error::CannotDropUnpersistedChunk { .. }
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, u32) {
|
||||
write_lp(db, "cpu bar=1 10").await;
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
|
|
@ -9,7 +9,7 @@ use snafu::ResultExt;
|
|||
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
|
||||
|
||||
use super::{
|
||||
error::{CommitError, Result},
|
||||
error::{CannotDropUnpersistedChunk, CommitError, Result},
|
||||
LockableCatalogChunk, LockableCatalogPartition,
|
||||
};
|
||||
use crate::db::catalog::{
|
||||
|
@ -34,6 +34,16 @@ pub fn drop_chunk(
|
|||
chunk: guard.addr().clone(),
|
||||
});
|
||||
|
||||
// check if we're dropping an unpersisted chunk in a persisted DB
|
||||
// See https://github.com/influxdata/influxdb_iox/issues/2291
|
||||
if db.rules().lifecycle_rules.persist && !matches!(guard.stage(), ChunkStage::Persisted { .. })
|
||||
{
|
||||
return CannotDropUnpersistedChunk {
|
||||
addr: guard.addr().clone(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
guard.set_dropping(®istration)?;
|
||||
|
||||
// Drop locks
|
||||
|
|
|
@ -59,6 +59,9 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Cannot write chunk: {}", addr))]
|
||||
CannotWriteChunk { addr: ChunkAddr },
|
||||
|
||||
#[snafu(display("Cannot drop unpersisted chunk: {}", addr))]
|
||||
CannotDropUnpersistedChunk { addr: ChunkAddr },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
|
Loading…
Reference in New Issue