feat: drop partition gRPC
parent
9454e06d61
commit
fcf2bee443
|
@ -73,6 +73,9 @@ service ManagementService {
|
|||
|
||||
// Skip replay for given DB.
|
||||
rpc SkipReplay(SkipReplayRequest) returns (SkipReplayResponse);
|
||||
|
||||
// Drop partition from memory and (if persisted) from object store.
|
||||
rpc DropPartition(DropPartitionRequest) returns (DropPartitionResponse);
|
||||
}
|
||||
|
||||
message GetServerIdRequest {}
|
||||
|
@ -345,3 +348,19 @@ message SkipReplayRequest {
|
|||
|
||||
message SkipReplayResponse {
|
||||
}
|
||||
|
||||
|
||||
// Request to drop partition from memory and (if persisted) from object store.
|
||||
message DropPartitionRequest {
|
||||
// the name of the database
|
||||
string db_name = 1;
|
||||
|
||||
// the partition key
|
||||
string partition_key = 2;
|
||||
|
||||
// the table name
|
||||
string table_name = 3;
|
||||
}
|
||||
|
||||
message DropPartitionResponse {
|
||||
}
|
||||
|
|
|
@ -317,6 +317,26 @@ pub enum SkipReplayError {
|
|||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// Errors returned by [`Client::drop_partition`]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DropPartitionError {
|
||||
/// Database not found
|
||||
#[error("Not found: {}", .0)]
|
||||
NotFound(String),
|
||||
|
||||
/// Server indicated that it is not (yet) available
|
||||
#[error("Server unavailable: {}", .0.message())]
|
||||
Unavailable(tonic::Status),
|
||||
|
||||
/// Server indicated that it is not (yet) available
|
||||
#[error("Cannot perform operation due to wrong chunk lifecycle state: {}", .0.message())]
|
||||
LifecycleError(tonic::Status),
|
||||
|
||||
/// Client received an unexpected error from the server
|
||||
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
|
||||
ServerError(tonic::Status),
|
||||
}
|
||||
|
||||
/// An IOx Management API client.
|
||||
///
|
||||
/// This client wraps the underlying `tonic` generated client with a
|
||||
|
@ -778,4 +798,32 @@ impl Client {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Drop partition from memory and (if persisted) from object store.
|
||||
pub async fn drop_partition(
|
||||
&mut self,
|
||||
db_name: impl Into<String> + Send,
|
||||
table_name: impl Into<String> + Send,
|
||||
partition_key: impl Into<String> + Send,
|
||||
) -> Result<(), DropPartitionError> {
|
||||
let db_name = db_name.into();
|
||||
let partition_key = partition_key.into();
|
||||
let table_name = table_name.into();
|
||||
|
||||
self.inner
|
||||
.drop_partition(DropPartitionRequest {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
})
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
tonic::Code::NotFound => DropPartitionError::NotFound(status.message().to_string()),
|
||||
tonic::Code::Unavailable => DropPartitionError::Unavailable(status),
|
||||
tonic::Code::FailedPrecondition => DropPartitionError::LifecycleError(status),
|
||||
_ => DropPartitionError::ServerError(status),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -484,6 +484,30 @@ where
|
|||
|
||||
Ok(Response::new(SkipReplayResponse {}))
|
||||
}
|
||||
|
||||
async fn drop_partition(
|
||||
&self,
|
||||
request: tonic::Request<DropPartitionRequest>,
|
||||
) -> Result<tonic::Response<DropPartitionResponse>, tonic::Status> {
|
||||
let DropPartitionRequest {
|
||||
db_name,
|
||||
partition_key,
|
||||
table_name,
|
||||
} = request.into_inner();
|
||||
|
||||
// Validate that the database name is legit
|
||||
let db_name = DatabaseName::new(db_name).field("db_name")?;
|
||||
let db = self
|
||||
.server
|
||||
.db(&db_name)
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
db.drop_partition(&table_name, &partition_key)
|
||||
.await
|
||||
.map_err(default_db_error_handler)?;
|
||||
|
||||
Ok(Response::new(DropPartitionResponse {}))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_server<M>(
|
||||
|
|
|
@ -1126,3 +1126,103 @@ async fn test_chunk_access_time() {
|
|||
assert!(t2 < t3, "{} {}", t2, t3);
|
||||
assert_eq!(t3, t4)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_partition() {
|
||||
use data_types::chunk_metadata::ChunkStorage;
|
||||
|
||||
let fixture = ServerFixture::create_shared().await;
|
||||
let mut write_client = fixture.write_client();
|
||||
let mut management_client = fixture.management_client();
|
||||
|
||||
let db_name = rand_name();
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.persist(true)
|
||||
.persist_age_threshold_seconds(1)
|
||||
.late_arrive_window_seconds(1)
|
||||
.build(fixture.grpc_channel())
|
||||
.await;
|
||||
|
||||
let lp_lines: Vec<_> = (0..1_000)
|
||||
.map(|i| format!("data,tag1=val{} x={} {}", i, i * 10, i))
|
||||
.collect();
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 1000);
|
||||
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![ChunkStorage::ReadBufferAndObjectStore],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
||||
let chunks = management_client
|
||||
.list_chunks(&db_name)
|
||||
.await
|
||||
.expect("listing chunks");
|
||||
assert_eq!(chunks.len(), 1);
|
||||
let partition_key = &chunks[0].partition_key;
|
||||
|
||||
management_client
|
||||
.drop_partition(&db_name, "data", &partition_key[..])
|
||||
.await
|
||||
.unwrap();
|
||||
let chunks = management_client
|
||||
.list_chunks(&db_name)
|
||||
.await
|
||||
.expect("listing chunks");
|
||||
assert_eq!(chunks.len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_partition_error() {
|
||||
use data_types::chunk_metadata::ChunkStorage;
|
||||
|
||||
let fixture = ServerFixture::create_shared().await;
|
||||
let mut write_client = fixture.write_client();
|
||||
let mut management_client = fixture.management_client();
|
||||
|
||||
let db_name = rand_name();
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.persist(true)
|
||||
.persist_age_threshold_seconds(1_000)
|
||||
.late_arrive_window_seconds(1_000)
|
||||
.build(fixture.grpc_channel())
|
||||
.await;
|
||||
|
||||
let lp_lines: Vec<_> = (0..1_000)
|
||||
.map(|i| format!("data,tag1=val{} x={} {}", i, i * 10, i))
|
||||
.collect();
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 1000);
|
||||
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![ChunkStorage::OpenMutableBuffer],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
||||
let chunks = management_client
|
||||
.list_chunks(&db_name)
|
||||
.await
|
||||
.expect("listing chunks");
|
||||
assert_eq!(chunks.len(), 1);
|
||||
let partition_key = &chunks[0].partition_key;
|
||||
|
||||
let err = management_client
|
||||
.drop_partition(&db_name, "data", &partition_key[..])
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_contains!(err.to_string(), "Cannot drop unpersisted chunk");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue