feat: add API to drop entire partitions

pull/24376/head
Marco Neumann 2021-08-16 17:30:14 +02:00
parent a20b452cf3
commit 77892a0998
6 changed files with 209 additions and 3 deletions

View File

@ -37,6 +37,9 @@ pub enum Job {
/// Drop chunk from memory and (if persisted) from object store.
DropChunk { chunk: ChunkAddr },
/// Drop partition from memory and (if persisted) from object store.
DropPartition { partition: PartitionAddr },
/// Wipe preserved catalog
WipePreservedCatalog { db_name: Arc<str> },
}
@ -51,6 +54,7 @@ impl Job {
Self::CompactChunks { partition, .. } => Some(&partition.db_name),
Self::PersistChunks { partition, .. } => Some(&partition.db_name),
Self::DropChunk { chunk, .. } => Some(&chunk.db_name),
Self::DropPartition { partition, .. } => Some(&partition.db_name),
Self::WipePreservedCatalog { db_name, .. } => Some(db_name),
}
}
@ -64,6 +68,7 @@ impl Job {
Self::CompactChunks { partition, .. } => Some(&partition.partition_key),
Self::PersistChunks { partition, .. } => Some(&partition.partition_key),
Self::DropChunk { chunk, .. } => Some(&chunk.partition_key),
Self::DropPartition { partition, .. } => Some(&partition.partition_key),
Self::WipePreservedCatalog { .. } => None,
}
}
@ -77,6 +82,7 @@ impl Job {
Self::CompactChunks { partition, .. } => Some(&partition.table_name),
Self::PersistChunks { partition, .. } => Some(&partition.table_name),
Self::DropChunk { chunk, .. } => Some(&chunk.table_name),
Self::DropPartition { partition, .. } => Some(&partition.table_name),
Self::WipePreservedCatalog { .. } => None,
}
}
@ -90,6 +96,7 @@ impl Job {
Self::CompactChunks { chunks, .. } => Some(chunks.clone()),
Self::PersistChunks { chunks, .. } => Some(chunks.clone()),
Self::DropChunk { chunk, .. } => Some(vec![chunk.chunk_id]),
Self::DropPartition { .. } => None,
Self::WipePreservedCatalog { .. } => None,
}
}
@ -103,6 +110,9 @@ impl Job {
Self::CompactChunks { .. } => "Compacting chunks to ReadBuffer",
Self::PersistChunks { .. } => "Persisting chunks to object storage",
Self::DropChunk { .. } => "Drop chunk from memory and (if persisted) from object store",
Self::DropPartition { .. } => {
"Drop partition from memory and (if persisted) from object store"
}
Self::WipePreservedCatalog { .. } => "Wipe preserved catalog",
}
}

View File

@ -39,6 +39,7 @@ message OperationMetadata {
CompactChunks compact_chunks = 10;
PersistChunks persist_chunks = 11;
DropChunk drop_chunk = 12;
DropPartition drop_partition = 17;
}
}
@ -126,6 +127,18 @@ message DropChunk {
uint32 chunk_id = 3;
}
// Drop partition from memory and (if persisted) from object store.
message DropPartition {
// name of the database
string db_name = 1;
// partition key
string partition_key = 2;
// table name
string table_name = 4;
}
// Wipe preserved catalog
message WipePreservedCatalog {
// name of the database

View File

@ -53,6 +53,11 @@ impl From<Job> for management::operation_metadata::Job {
table_name: chunk.table_name.to_string(),
chunk_id: chunk.chunk_id,
}),
Job::DropPartition { partition } => Self::DropPartition(management::DropPartition {
db_name: partition.db_name.to_string(),
partition_key: partition.partition_key.to_string(),
table_name: partition.table_name.to_string(),
}),
}
}
}
@ -135,6 +140,17 @@ impl From<management::operation_metadata::Job> for Job {
chunk_id,
},
},
Job::DropPartition(management::DropPartition {
db_name,
partition_key,
table_name,
}) => Self::DropPartition {
partition: PartitionAddr {
db_name: Arc::from(db_name.as_str()),
table_name: Arc::from(table_name.as_str()),
partition_key: Arc::from(partition_key.as_str()),
},
},
}
}
}

View File

@ -609,6 +609,25 @@ impl Db {
fut.await.context(TaskCancelled)?.context(LifecycleError)
}
/// Drops the specified partition from the catalog and all storage systems
pub async fn drop_partition(
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
) -> Result<()> {
// Use explicit scope to ensure the async generator doesn't
// assume the locks have to possibly live across the `await`
let fut = {
let partition = self.partition(table_name, partition_key)?;
let partition = LockableCatalogPartition::new(Arc::clone(self), partition);
let partition = partition.write();
let (_, fut) = lifecycle::drop_partition(partition).context(LifecycleError)?;
fut
};
fut.await.context(TaskCancelled)?.context(LifecycleError)
}
/// Copies a chunk in the Closed state into the ReadBuffer from
/// the mutable buffer and marks the chunk with `Moved` state
///
@ -4259,6 +4278,47 @@ mod tests {
));
}
#[tokio::test]
async fn drop_unpersisted_partition_on_persisted_db() {
let test_db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
persist: true,
..Default::default()
})
.build()
.await;
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10").await;
let partition_key = "1970-01-01T00";
// We don't support dropping unpersisted chunks from a persisted DB because we would forget the write buffer
// progress (partition checkpoints are only created when new parquet files are stored).
// See https://github.com/influxdata/influxdb_iox/issues/2291
let err = db.drop_partition("cpu", partition_key).await.unwrap_err();
assert!(matches!(
err,
Error::LifecycleError {
source: super::lifecycle::Error::CannotDropUnpersistedChunk { .. }
}
));
// once persisted drop should work
db.persist_partition(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.drop_partition("cpu", partition_key).await.unwrap();
// no chunks left
assert_eq!(db.partition_chunk_summaries(partition_key), vec![]);
}
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, u32) {
write_lp(db, "cpu bar=1 10").await;
let partition_key = "1970-01-01T00";

View File

@ -34,7 +34,7 @@ use std::{
use tracker::{RwLock, TaskTracker};
pub(crate) use compact::compact_chunks;
pub(crate) use drop::drop_chunk;
pub(crate) use drop::{drop_chunk, drop_partition};
pub(crate) use error::{Error, Result};
pub(crate) use move_chunk::move_chunk_to_read_buffer;
pub(crate) use persist::persist_chunks;

View File

@ -2,14 +2,14 @@ use std::sync::Arc;
use data_types::job::Job;
use futures::Future;
use lifecycle::LifecycleWriteGuard;
use lifecycle::{LifecycleWriteGuard, LockableChunk};
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::debug;
use snafu::ResultExt;
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use super::{
error::{CannotDropUnpersistedChunk, CommitError, Result},
error::{CannotDropUnpersistedChunk, CommitError, Error, Result},
LockableCatalogChunk, LockableCatalogPartition,
};
use crate::db::catalog::{
@ -80,3 +80,110 @@ pub fn drop_chunk(
Ok((tracker, fut.track(registration)))
}
pub fn drop_partition(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<()>> + Send>,
)> {
let db = Arc::clone(&partition.data().db);
let preserved_catalog = Arc::clone(&db.preserved_catalog);
let table_name = partition.table_name().to_string();
let partition_key = partition.key().to_string();
let (tracker, registration) = db.jobs.register(Job::DropPartition {
partition: partition.addr().clone(),
});
// get locks for all chunks
let lockable_chunks: Vec<_> = partition
.chunks()
.map(|chunk| LockableCatalogChunk {
db: Arc::clone(&db),
chunk: Arc::clone(chunk),
})
.collect();
let mut guards: Vec<_> = lockable_chunks
.iter()
.map(|lockable_chunk| lockable_chunk.write())
.collect();
// NOTE: here we could just use `drop_chunk` for every chunk, but that would lead to a large number of catalog
// transactions and is rather inefficient.
// pre-check all chunks before touching any
for guard in &guards {
// check if we're dropping an unpersisted chunk in a persisted DB
// See https://github.com/influxdata/influxdb_iox/issues/2291
if db.rules().lifecycle_rules.persist
&& !matches!(guard.stage(), ChunkStage::Persisted { .. })
{
return CannotDropUnpersistedChunk {
addr: guard.addr().clone(),
}
.fail();
}
if let Some(action) = guard.lifecycle_action() {
return Err(Error::ChunkError {
source: crate::db::catalog::chunk::Error::LifecycleActionAlreadyInProgress {
chunk: guard.addr().clone(),
lifecycle_action: action.metadata().name().to_string(),
},
});
}
}
// start lifecycle action
for guard in &mut guards {
guard.set_dropping(&registration)?;
}
// Drop locks
let chunks: Vec<_> = guards
.into_iter()
.map(|guard| guard.into_data().chunk)
.collect();
let partition = partition.into_data().partition;
let fut = async move {
debug!(%table_name, %partition_key, "dropping partition");
// collect parquet files that we need to remove
let mut paths = vec![];
for chunk in &chunks {
let chunk_read = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk_read.stage() {
let path: DirsAndFileName = parquet.path().into();
paths.push(path);
}
}
// only create catalog transaction when there's anything to do
if !paths.is_empty() {
let mut transaction = preserved_catalog.open_transaction().await;
for path in paths {
transaction.remove_parquet(&path);
}
transaction.commit().await.context(CommitError)?;
}
let mut partition = partition.write();
for chunk in &chunks {
let chunk_id = {
let chunk_read = chunk.read();
chunk_read.id()
};
partition
.drop_chunk(chunk_id)
.expect("how did we end up dropping a chunk that cannot be dropped?!");
}
Ok(())
};
Ok((tracker, fut.track(registration)))
}