diff --git a/router/tests/grpc.rs b/router/tests/grpc.rs index 4350703793..cee23ae581 100644 --- a/router/tests/grpc.rs +++ b/router/tests/grpc.rs @@ -134,6 +134,138 @@ async fn test_namespace_create() { assert_eq!(response.status(), StatusCode::NO_CONTENT); } +/// Ensure invoking the gRPC NamespaceService to delete a namespace propagates +/// the catalog and denies writes after the cache has converged / router +/// restarted. +#[tokio::test] +async fn test_namespace_delete() { + // Initialise a TestContext requiring explicit namespace creation. + let ctx = TestContext::new(true, None).await; + + const RETENTION: i64 = Duration::from_secs(42 * 60 * 60).as_nanos() as _; + + // Explicitly create the namespace. + let req = CreateNamespaceRequest { + name: "bananas_test".to_string(), + retention_period_ns: Some(RETENTION), + }; + let got = ctx + .grpc_delegate() + .namespace_service() + .create_namespace(Request::new(req)) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + + assert_eq!(got.name, "bananas_test"); + assert_eq!(got.id, 1); + assert_eq!(got.retention_period_ns, Some(RETENTION)); + + // The namespace is usable. + let now = SystemProvider::default() + .now() + .timestamp_nanos() + .to_string(); + let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now; + let response = ctx + .write_lp("bananas", "test", &lp) + .await + .expect("write failed"); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + // The RPC endpoint must not return the namespace. + { + let current = ctx + .grpc_delegate() + .namespace_service() + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner(); + assert!(!current.namespaces.is_empty()); + } + + // Delete the namespace + { + let _resp = ctx + .grpc_delegate() + .namespace_service() + .delete_namespace(Request::new(DeleteNamespaceRequest { + name: "bananas_test".to_string(), + })) + .await + .expect("must delete"); + } + + // The RPC endpoint must not return the namespace. + { + let current = ctx + .grpc_delegate() + .namespace_service() + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner(); + assert!(current.namespaces.is_empty()); + } + + // The catalog should contain the namespace, but "soft-deleted". + { + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::ExcludeDeleted) + .await + .expect("query failure"); + assert!(db_list.is_empty()); + + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::OnlyDeleted) + .await + .expect("query failure"); + assert_matches!(db_list.as_slice(), [ns] => { + assert_eq!(ns.id.get(), got.id); + assert_eq!(ns.name, got.name); + assert_eq!(ns.retention_period_ns, got.retention_period_ns); + assert!(ns.deleted_at.is_some()); + }); + } + + // The cached entry is not affected, and writes continue to be validated + // against cached entry. + // + // https://github.com/influxdata/influxdb_iox/issues/6175 + + let response = ctx + .write_lp("bananas", "test", &lp) + .await + .expect("write failed"); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + // The router restarts, and writes are no longer accepted for the + // soft-deleted bucket. + let ctx = ctx.restart(); + + let err = ctx + .write_lp("bananas", "test", lp) + .await + .expect_err("write should fail"); + assert_matches!( + err, + router::server::http::Error::NamespaceResolver(router::namespace_resolver::Error::Lookup( + iox_catalog::interface::Error::NamespaceNotFoundByName { .. } + )) + ); +} + /// Ensure creating a namespace with a retention period of 0 maps to "infinite" /// and not "none". #[tokio::test] diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index 876a7f6ab5..da47fe3b3b 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -97,12 +97,24 @@ impl namespace_service_server::NamespaceService for NamespaceService { async fn delete_namespace( &self, - _request: Request, + request: Request, ) -> Result, Status> { - warn!("call to namespace delete - unimplemented"); - Err(Status::unimplemented( - "namespace delete is not yet supported", - )) + let namespace_name = request.into_inner().name; + + self.catalog + .repositories() + .await + .namespaces() + .soft_delete(&namespace_name) + .await + .map_err(|e| { + warn!(error=%e, %namespace_name, "failed to soft-delete namespace"); + Status::internal(e.to_string()) + })?; + + info!(namespace_name, "soft-deleted namespace"); + + Ok(Response::new(Default::default())) } async fn update_namespace_retention( @@ -179,21 +191,138 @@ fn map_retention_period(v: Option) -> Result, Status> { #[cfg(test)] mod tests { + use std::time::Duration; + + use assert_matches::assert_matches; + use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService as _; + use iox_catalog::mem::MemCatalog; use tonic::Code; use super::*; + const RETENTION: i64 = Duration::from_secs(42 * 60 * 60).as_nanos() as _; + const NS_NAME: &str = "bananas"; + #[test] fn test_retention_mapping() { - assert_matches::assert_matches!(map_retention_period(None), Ok(None)); - assert_matches::assert_matches!(map_retention_period(Some(0)), Ok(None)); - assert_matches::assert_matches!(map_retention_period(Some(1)), Ok(Some(1))); - assert_matches::assert_matches!(map_retention_period(Some(42)), Ok(Some(42))); - assert_matches::assert_matches!(map_retention_period(Some(-1)), Err(e) => { + assert_matches!(map_retention_period(None), Ok(None)); + assert_matches!(map_retention_period(Some(0)), Ok(None)); + assert_matches!(map_retention_period(Some(1)), Ok(Some(1))); + assert_matches!(map_retention_period(Some(42)), Ok(Some(42))); + assert_matches!(map_retention_period(Some(-1)), Err(e) => { assert_eq!(e.code(), Code::InvalidArgument) }); - assert_matches::assert_matches!(map_retention_period(Some(-42)), Err(e) => { + assert_matches!(map_retention_period(Some(-42)), Err(e) => { assert_eq!(e.code(), Code::InvalidArgument) }); } + + #[tokio::test] + async fn test_crud() { + let catalog: Arc = + Arc::new(MemCatalog::new(Arc::new(metric::Registry::default()))); + + let topic = catalog + .repositories() + .await + .topics() + .create_or_get("kafka-topic") + .await + .unwrap(); + let query_pool = catalog + .repositories() + .await + .query_pools() + .create_or_get("query-pool") + .await + .unwrap(); + + let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id)); + + // There should be no namespaces to start with. + { + let current = handler + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner() + .namespaces; + assert!(current.is_empty()); + } + + let req = CreateNamespaceRequest { + name: NS_NAME.to_string(), + retention_period_ns: Some(RETENTION), + }; + let created_ns = handler + .create_namespace(Request::new(req)) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(created_ns.name, NS_NAME); + assert_eq!(created_ns.retention_period_ns, Some(RETENTION)); + + // There should now be one namespace + { + let current = handler + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner() + .namespaces; + assert_matches!(current.as_slice(), [ns] => { + assert_eq!(ns, &created_ns); + }) + } + + // Update the retention period + let updated_ns = handler + .update_namespace_retention(Request::new(UpdateNamespaceRetentionRequest { + name: NS_NAME.to_string(), + retention_period_ns: Some(0), // A zero! + })) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(updated_ns.name, created_ns.name); + assert_eq!(updated_ns.id, created_ns.id); + assert_eq!(created_ns.retention_period_ns, Some(42)); + assert_eq!(updated_ns.retention_period_ns, None); + + // Listing the namespaces should return the updated namespace + { + let current = handler + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner() + .namespaces; + assert_matches!(current.as_slice(), [ns] => { + assert_eq!(ns, &updated_ns); + }) + } + + // Deleting the namespace should cause it to disappear + handler + .delete_namespace(Request::new(DeleteNamespaceRequest { + name: NS_NAME.to_string(), + })) + .await + .expect("must delete"); + + // Listing the namespaces should now return nothing. + { + let current = handler + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner() + .namespaces; + assert_matches!(current.as_slice(), []); + } + } }