feat: Add a gRPC endpoint to delete a skipped compaction
Also add a CLI usage of it for conveniencepull/24376/head
parent
df2ba85661
commit
fa7fe2e9cf
|
@ -2,7 +2,7 @@
|
|||
|
||||
use crate::{cold, compact::Compactor, hot};
|
||||
use async_trait::async_trait;
|
||||
use data_types::SkippedCompaction;
|
||||
use data_types::{PartitionId, SkippedCompaction};
|
||||
use futures::{
|
||||
future::{BoxFuture, Shared},
|
||||
FutureExt, TryFutureExt,
|
||||
|
@ -29,6 +29,12 @@ pub trait CompactorHandler: Send + Sync {
|
|||
&self,
|
||||
) -> Result<Vec<SkippedCompaction>, ListSkippedCompactionsError>;
|
||||
|
||||
/// Delete skipped compactions from the catalog
|
||||
async fn delete_skipped_compactions(
|
||||
&self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Option<SkippedCompaction>, DeleteSkippedCompactionsError>;
|
||||
|
||||
/// Wait until the handler finished to shutdown.
|
||||
///
|
||||
/// Use [`shutdown`](Self::shutdown) to trigger a shutdown.
|
||||
|
@ -207,6 +213,13 @@ pub enum ListSkippedCompactionsError {
|
|||
SkippedCompactionLookup(iox_catalog::interface::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
pub enum DeleteSkippedCompactionsError {
|
||||
#[error(transparent)]
|
||||
SkippedCompactionDelete(iox_catalog::interface::Error),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl CompactorHandler for CompactorHandlerImpl {
|
||||
async fn skipped_compactions(
|
||||
|
@ -222,6 +235,20 @@ impl CompactorHandler for CompactorHandlerImpl {
|
|||
.map_err(ListSkippedCompactionsError::SkippedCompactionLookup)
|
||||
}
|
||||
|
||||
async fn delete_skipped_compactions(
|
||||
&self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Option<SkippedCompaction>, DeleteSkippedCompactionsError> {
|
||||
self.compactor
|
||||
.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.delete_skipped_compactions(partition_id)
|
||||
.await
|
||||
.map_err(DeleteSkippedCompactionsError::SkippedCompactionDelete)
|
||||
}
|
||||
|
||||
async fn join(&self) {
|
||||
self.runner_handle
|
||||
.clone()
|
||||
|
@ -283,4 +310,47 @@ mod tests {
|
|||
assert_eq!(skipped_compactions.len(), 1);
|
||||
assert_eq!(skipped_compactions[0].partition_id, partition.partition.id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_skipped_compactions() {
|
||||
let TestSetup {
|
||||
compactor,
|
||||
table,
|
||||
shard,
|
||||
..
|
||||
} = test_setup().await;
|
||||
|
||||
let compactor_handler = CompactorHandlerImpl::new(Arc::clone(&compactor));
|
||||
|
||||
// no skipped compactions to delete
|
||||
let partition_id_that_does_not_exist = PartitionId::new(0);
|
||||
let deleted_skipped_compaction = compactor_handler
|
||||
.delete_skipped_compactions(partition_id_that_does_not_exist)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(deleted_skipped_compaction.is_none());
|
||||
|
||||
// insert a partition and a skipped compaction
|
||||
let partition = table.with_shard(&shard).create_partition("one").await;
|
||||
{
|
||||
let mut repos = compactor.catalog.repositories().await;
|
||||
repos
|
||||
.partitions()
|
||||
.record_skipped_compaction(partition.partition.id, "Not today", 3, 2, 100_000, 100)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let deleted_skipped_compaction = compactor_handler
|
||||
.delete_skipped_compactions(partition.partition.id)
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("Should have deleted one skipped compaction");
|
||||
|
||||
assert_eq!(
|
||||
deleted_skipped_compaction.partition_id,
|
||||
partition.partition.id,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
//! gRPC service implementations for `compactor`.
|
||||
|
||||
use crate::handler::{CompactorHandler, ListSkippedCompactionsError};
|
||||
use crate::handler::{
|
||||
CompactorHandler, DeleteSkippedCompactionsError, ListSkippedCompactionsError,
|
||||
};
|
||||
use data_types::PartitionId;
|
||||
use generated_types::influxdata::iox::compactor::v1::{
|
||||
self as proto,
|
||||
compaction_service_server::{CompactionService, CompactionServiceServer},
|
||||
|
@ -51,6 +54,17 @@ impl From<ListSkippedCompactionsError> for tonic::Status {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<DeleteSkippedCompactionsError> for tonic::Status {
|
||||
/// Logs and converts a result from the business logic into the appropriate tonic status
|
||||
fn from(err: DeleteSkippedCompactionsError) -> Self {
|
||||
use DeleteSkippedCompactionsError::*;
|
||||
|
||||
match err {
|
||||
SkippedCompactionDelete(_) => Self::internal(err.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl CompactionService for CompactionServiceImpl {
|
||||
async fn list_skipped_compactions(
|
||||
|
@ -71,4 +85,22 @@ impl CompactionService for CompactionServiceImpl {
|
|||
},
|
||||
))
|
||||
}
|
||||
|
||||
async fn delete_skipped_compactions(
|
||||
&self,
|
||||
request: Request<proto::DeleteSkippedCompactionsRequest>,
|
||||
) -> Result<Response<proto::DeleteSkippedCompactionsResponse>, tonic::Status> {
|
||||
let partition_id = request.into_inner().partition_id;
|
||||
let partition_id = PartitionId::new(partition_id);
|
||||
|
||||
let skipped_compaction = self
|
||||
.handler
|
||||
.delete_skipped_compactions(partition_id)
|
||||
.await?
|
||||
.map(From::from);
|
||||
|
||||
Ok(tonic::Response::new(
|
||||
proto::DeleteSkippedCompactionsResponse { skipped_compaction },
|
||||
))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,9 @@ option go_package = "github.com/influxdata/iox/compactor/v1";
|
|||
service CompactionService {
|
||||
// List all skipped compactions in the catalog
|
||||
rpc ListSkippedCompactions(ListSkippedCompactionsRequest) returns (ListSkippedCompactionsResponse);
|
||||
|
||||
// Delete a skipped compaction by partition ID
|
||||
rpc DeleteSkippedCompactions(DeleteSkippedCompactionsRequest) returns (DeleteSkippedCompactionsResponse);
|
||||
}
|
||||
|
||||
message ListSkippedCompactionsRequest {}
|
||||
|
@ -40,3 +43,12 @@ message SkippedCompaction {
|
|||
// operation at the time this compaction was skipped.
|
||||
int64 limit_bytes = 7;
|
||||
}
|
||||
|
||||
message DeleteSkippedCompactionsRequest {
|
||||
int64 partition_id = 1;
|
||||
}
|
||||
|
||||
message DeleteSkippedCompactionsResponse {
|
||||
// The deleted skipped compaction
|
||||
optional SkippedCompaction skipped_compaction = 1;
|
||||
}
|
||||
|
|
|
@ -29,6 +29,9 @@ pub struct Config {
|
|||
enum Command {
|
||||
/// List all skipped compactions
|
||||
List,
|
||||
|
||||
/// Delete the requested skipped compaction
|
||||
Delete { partition_id: i64 },
|
||||
}
|
||||
|
||||
pub async fn command(connection: Connection, config: Config) -> Result<(), Error> {
|
||||
|
@ -37,6 +40,18 @@ pub async fn command(connection: Connection, config: Config) -> Result<(), Error
|
|||
Command::List => {
|
||||
let skipped_compactions = client.skipped_compactions().await?;
|
||||
println!("{}", create_table(&skipped_compactions));
|
||||
}
|
||||
|
||||
Command::Delete { partition_id } => {
|
||||
let deleted_skipped_compactions =
|
||||
client.delete_skipped_compactions(partition_id).await?;
|
||||
|
||||
let deleted_skipped_compactions = deleted_skipped_compactions
|
||||
.as_ref()
|
||||
.map(std::slice::from_ref)
|
||||
.unwrap_or_default();
|
||||
|
||||
println!("{}", create_table(deleted_skipped_compactions));
|
||||
} // Deliberately not adding _ => so the compiler will direct people here to impl new
|
||||
// commands
|
||||
}
|
||||
|
|
|
@ -30,4 +30,17 @@ impl Client {
|
|||
|
||||
Ok(response.into_inner().skipped_compactions)
|
||||
}
|
||||
|
||||
/// Delete the requested skipped compaction
|
||||
pub async fn delete_skipped_compactions(
|
||||
&mut self,
|
||||
partition_id: i64,
|
||||
) -> Result<Option<SkippedCompaction>, Error> {
|
||||
let response = self
|
||||
.inner
|
||||
.delete_skipped_compactions(DeleteSkippedCompactionsRequest { partition_id })
|
||||
.await?;
|
||||
|
||||
Ok(response.into_inner().skipped_compaction)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue