feat(router): soft-delete RPC handler
This implements the RPC "delete_namespace" handler, allowing a namespace to be soft-deleted. Adds unit coverage for all handlers & e2e test coverage for the new handler (the rest were already covered). The tests also highlight the caching issue documented here: https://github.com/influxdata/influxdb_iox/issues/6175pull/24376/head
parent
4a30df0e07
commit
61fb92b85c
|
@ -134,6 +134,138 @@ async fn test_namespace_create() {
|
|||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
}
|
||||
|
||||
/// Ensure invoking the gRPC NamespaceService to delete a namespace propagates
|
||||
/// the catalog and denies writes after the cache has converged / router
|
||||
/// restarted.
|
||||
#[tokio::test]
|
||||
async fn test_namespace_delete() {
|
||||
// Initialise a TestContext requiring explicit namespace creation.
|
||||
let ctx = TestContext::new(true, None).await;
|
||||
|
||||
const RETENTION: i64 = Duration::from_secs(42 * 60 * 60).as_nanos() as _;
|
||||
|
||||
// Explicitly create the namespace.
|
||||
let req = CreateNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(RETENTION),
|
||||
};
|
||||
let got = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
.into_inner()
|
||||
.namespace
|
||||
.expect("no namespace in response");
|
||||
|
||||
assert_eq!(got.name, "bananas_test");
|
||||
assert_eq!(got.id, 1);
|
||||
assert_eq!(got.retention_period_ns, Some(RETENTION));
|
||||
|
||||
// The namespace is usable.
|
||||
let now = SystemProvider::default()
|
||||
.now()
|
||||
.timestamp_nanos()
|
||||
.to_string();
|
||||
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
|
||||
let response = ctx
|
||||
.write_lp("bananas", "test", &lp)
|
||||
.await
|
||||
.expect("write failed");
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// The RPC endpoint must not return the namespace.
|
||||
{
|
||||
let current = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner();
|
||||
assert!(!current.namespaces.is_empty());
|
||||
}
|
||||
|
||||
// Delete the namespace
|
||||
{
|
||||
let _resp = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.delete_namespace(Request::new(DeleteNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
}))
|
||||
.await
|
||||
.expect("must delete");
|
||||
}
|
||||
|
||||
// The RPC endpoint must not return the namespace.
|
||||
{
|
||||
let current = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner();
|
||||
assert!(current.namespaces.is_empty());
|
||||
}
|
||||
|
||||
// The catalog should contain the namespace, but "soft-deleted".
|
||||
{
|
||||
let db_list = ctx
|
||||
.catalog()
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.list(SoftDeletedRows::ExcludeDeleted)
|
||||
.await
|
||||
.expect("query failure");
|
||||
assert!(db_list.is_empty());
|
||||
|
||||
let db_list = ctx
|
||||
.catalog()
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.list(SoftDeletedRows::OnlyDeleted)
|
||||
.await
|
||||
.expect("query failure");
|
||||
assert_matches!(db_list.as_slice(), [ns] => {
|
||||
assert_eq!(ns.id.get(), got.id);
|
||||
assert_eq!(ns.name, got.name);
|
||||
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
|
||||
assert!(ns.deleted_at.is_some());
|
||||
});
|
||||
}
|
||||
|
||||
// The cached entry is not affected, and writes continue to be validated
|
||||
// against cached entry.
|
||||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/6175
|
||||
|
||||
let response = ctx
|
||||
.write_lp("bananas", "test", &lp)
|
||||
.await
|
||||
.expect("write failed");
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// The router restarts, and writes are no longer accepted for the
|
||||
// soft-deleted bucket.
|
||||
let ctx = ctx.restart();
|
||||
|
||||
let err = ctx
|
||||
.write_lp("bananas", "test", lp)
|
||||
.await
|
||||
.expect_err("write should fail");
|
||||
assert_matches!(
|
||||
err,
|
||||
router::server::http::Error::NamespaceResolver(router::namespace_resolver::Error::Lookup(
|
||||
iox_catalog::interface::Error::NamespaceNotFoundByName { .. }
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
/// Ensure creating a namespace with a retention period of 0 maps to "infinite"
|
||||
/// and not "none".
|
||||
#[tokio::test]
|
||||
|
|
|
@ -97,12 +97,24 @@ impl namespace_service_server::NamespaceService for NamespaceService {
|
|||
|
||||
async fn delete_namespace(
|
||||
&self,
|
||||
_request: Request<DeleteNamespaceRequest>,
|
||||
request: Request<DeleteNamespaceRequest>,
|
||||
) -> Result<Response<DeleteNamespaceResponse>, Status> {
|
||||
warn!("call to namespace delete - unimplemented");
|
||||
Err(Status::unimplemented(
|
||||
"namespace delete is not yet supported",
|
||||
))
|
||||
let namespace_name = request.into_inner().name;
|
||||
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.soft_delete(&namespace_name)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
warn!(error=%e, %namespace_name, "failed to soft-delete namespace");
|
||||
Status::internal(e.to_string())
|
||||
})?;
|
||||
|
||||
info!(namespace_name, "soft-deleted namespace");
|
||||
|
||||
Ok(Response::new(Default::default()))
|
||||
}
|
||||
|
||||
async fn update_namespace_retention(
|
||||
|
@ -179,21 +191,138 @@ fn map_retention_period(v: Option<i64>) -> Result<Option<i64>, Status> {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService as _;
|
||||
use iox_catalog::mem::MemCatalog;
|
||||
use tonic::Code;
|
||||
|
||||
use super::*;
|
||||
|
||||
const RETENTION: i64 = Duration::from_secs(42 * 60 * 60).as_nanos() as _;
|
||||
const NS_NAME: &str = "bananas";
|
||||
|
||||
#[test]
|
||||
fn test_retention_mapping() {
|
||||
assert_matches::assert_matches!(map_retention_period(None), Ok(None));
|
||||
assert_matches::assert_matches!(map_retention_period(Some(0)), Ok(None));
|
||||
assert_matches::assert_matches!(map_retention_period(Some(1)), Ok(Some(1)));
|
||||
assert_matches::assert_matches!(map_retention_period(Some(42)), Ok(Some(42)));
|
||||
assert_matches::assert_matches!(map_retention_period(Some(-1)), Err(e) => {
|
||||
assert_matches!(map_retention_period(None), Ok(None));
|
||||
assert_matches!(map_retention_period(Some(0)), Ok(None));
|
||||
assert_matches!(map_retention_period(Some(1)), Ok(Some(1)));
|
||||
assert_matches!(map_retention_period(Some(42)), Ok(Some(42)));
|
||||
assert_matches!(map_retention_period(Some(-1)), Err(e) => {
|
||||
assert_eq!(e.code(), Code::InvalidArgument)
|
||||
});
|
||||
assert_matches::assert_matches!(map_retention_period(Some(-42)), Err(e) => {
|
||||
assert_matches!(map_retention_period(Some(-42)), Err(e) => {
|
||||
assert_eq!(e.code(), Code::InvalidArgument)
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_crud() {
|
||||
let catalog: Arc<dyn Catalog> =
|
||||
Arc::new(MemCatalog::new(Arc::new(metric::Registry::default())));
|
||||
|
||||
let topic = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.topics()
|
||||
.create_or_get("kafka-topic")
|
||||
.await
|
||||
.unwrap();
|
||||
let query_pool = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.query_pools()
|
||||
.create_or_get("query-pool")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id));
|
||||
|
||||
// There should be no namespaces to start with.
|
||||
{
|
||||
let current = handler
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner()
|
||||
.namespaces;
|
||||
assert!(current.is_empty());
|
||||
}
|
||||
|
||||
let req = CreateNamespaceRequest {
|
||||
name: NS_NAME.to_string(),
|
||||
retention_period_ns: Some(RETENTION),
|
||||
};
|
||||
let created_ns = handler
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
.into_inner()
|
||||
.namespace
|
||||
.expect("no namespace in response");
|
||||
assert_eq!(created_ns.name, NS_NAME);
|
||||
assert_eq!(created_ns.retention_period_ns, Some(RETENTION));
|
||||
|
||||
// There should now be one namespace
|
||||
{
|
||||
let current = handler
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner()
|
||||
.namespaces;
|
||||
assert_matches!(current.as_slice(), [ns] => {
|
||||
assert_eq!(ns, &created_ns);
|
||||
})
|
||||
}
|
||||
|
||||
// Update the retention period
|
||||
let updated_ns = handler
|
||||
.update_namespace_retention(Request::new(UpdateNamespaceRetentionRequest {
|
||||
name: NS_NAME.to_string(),
|
||||
retention_period_ns: Some(0), // A zero!
|
||||
}))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
.into_inner()
|
||||
.namespace
|
||||
.expect("no namespace in response");
|
||||
assert_eq!(updated_ns.name, created_ns.name);
|
||||
assert_eq!(updated_ns.id, created_ns.id);
|
||||
assert_eq!(created_ns.retention_period_ns, Some(42));
|
||||
assert_eq!(updated_ns.retention_period_ns, None);
|
||||
|
||||
// Listing the namespaces should return the updated namespace
|
||||
{
|
||||
let current = handler
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner()
|
||||
.namespaces;
|
||||
assert_matches!(current.as_slice(), [ns] => {
|
||||
assert_eq!(ns, &updated_ns);
|
||||
})
|
||||
}
|
||||
|
||||
// Deleting the namespace should cause it to disappear
|
||||
handler
|
||||
.delete_namespace(Request::new(DeleteNamespaceRequest {
|
||||
name: NS_NAME.to_string(),
|
||||
}))
|
||||
.await
|
||||
.expect("must delete");
|
||||
|
||||
// Listing the namespaces should now return nothing.
|
||||
{
|
||||
let current = handler
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner()
|
||||
.namespaces;
|
||||
assert_matches!(current.as_slice(), []);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue