From f4990225111d8d1e27f9ebb2184abac3710b10c1 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 15 Feb 2023 10:28:19 +0100 Subject: [PATCH 1/6] feat: add compaction level to commit metrics (#6985) * feat: add compaction level to commit metrics * test: more realism --- Cargo.lock | 1 + compactor2/Cargo.toml | 1 + compactor2/src/components/commit/metrics.rs | 319 ++++++++++++++++---- data_types/src/lib.rs | 18 +- iox_tests/src/builders.rs | 8 +- 5 files changed, 286 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db57c97165..433b2dd6f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -991,6 +991,7 @@ dependencies = [ "iox_query", "iox_tests", "iox_time", + "itertools", "metric", "object_store", "observability_deps", diff --git a/compactor2/Cargo.toml b/compactor2/Cargo.toml index 3726a0b3a7..189ac24a5e 100644 --- a/compactor2/Cargo.toml +++ b/compactor2/Cargo.toml @@ -15,6 +15,7 @@ futures = "0.3" iox_catalog = { path = "../iox_catalog" } iox_query = { path = "../iox_query" } iox_time = { path = "../iox_time" } +itertools = "0.10.5" metric = { path = "../metric" } object_store = "0.5.4" observability_deps = { path = "../observability_deps" } diff --git a/compactor2/src/components/commit/metrics.rs b/compactor2/src/components/commit/metrics.rs index 5adfdd8389..67d48eb518 100644 --- a/compactor2/src/components/commit/metrics.rs +++ b/compactor2/src/components/commit/metrics.rs @@ -1,7 +1,8 @@ -use std::fmt::Display; +use std::{collections::HashMap, fmt::Display}; use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; +use itertools::Itertools; use metric::{Registry, U64Histogram, U64HistogramOptions}; use super::Commit; @@ -44,9 +45,14 @@ impl From for U64HistogramOptions { #[derive(Debug)] struct Histogram { - create: U64Histogram, - delete: U64Histogram, - upgrade: U64Histogram, + /// Files created, by the level they were created at. + create: HashMap, + + /// Files deleted, by the level they had at this point in time. + delete: HashMap, + + /// Files upgraded, by level they had before the upgrade and the target compaction level. + upgrade: HashMap<(CompactionLevel, CompactionLevel), U64Histogram>, } impl Histogram { @@ -59,9 +65,34 @@ impl Histogram { let metric = registry .register_metric_with_options::(name, description, || t.into()); - let create = metric.recorder(&[("op", "create")]); - let delete = metric.recorder(&[("op", "delete")]); - let upgrade = metric.recorder(&[("op", "upgrade")]); + let create = CompactionLevel::all() + .iter() + .map(|level| { + ( + *level, + metric.recorder(&[("op", "create"), ("level", level.name())]), + ) + }) + .collect(); + let delete = CompactionLevel::all() + .iter() + .map(|level| { + ( + *level, + metric.recorder(&[("op", "delete"), ("level", level.name())]), + ) + }) + .collect(); + let upgrade = CompactionLevel::all() + .iter() + .cartesian_product(CompactionLevel::all()) + .map(|(from, to)| { + ( + (*from, *to), + metric.recorder(&[("op", "upgrade"), ("from", from.name()), ("to", to.name())]), + ) + }) + .collect(); Self { create, delete, @@ -160,43 +191,110 @@ where // per file metrics for f in create { - self.file_bytes.create.record(f.file_size_bytes as u64); - self.file_rows.create.record(f.row_count as u64); + self.file_bytes + .create + .get(&f.compaction_level) + .expect("all compaction levels covered") + .record(f.file_size_bytes as u64); + self.file_rows + .create + .get(&f.compaction_level) + .expect("all compaction levels covered") + .record(f.row_count as u64); } for f in delete { - self.file_bytes.delete.record(f.file_size_bytes as u64); - self.file_rows.delete.record(f.row_count as u64); + self.file_bytes + .delete + .get(&f.compaction_level) + .expect("all compaction levels covered") + .record(f.file_size_bytes as u64); + self.file_rows + .delete + .get(&f.compaction_level) + .expect("all compaction levels covered") + .record(f.row_count as u64); } for f in upgrade { - self.file_bytes.upgrade.record(f.file_size_bytes as u64); - self.file_rows.upgrade.record(f.row_count as u64); + self.file_bytes + .upgrade + .get(&(f.compaction_level, target_level)) + .expect("all compaction levels covered") + .record(f.file_size_bytes as u64); + self.file_rows + .upgrade + .get(&(f.compaction_level, target_level)) + .expect("all compaction levels covered") + .record(f.row_count as u64); } // per-partition metrics - self.job_files.create.record(create.len() as u64); - self.job_files.delete.record(delete.len() as u64); - self.job_files.upgrade.record(upgrade.len() as u64); - self.job_bytes - .create - .record(create.iter().map(|f| f.file_size_bytes as u64).sum::()); - self.job_bytes - .delete - .record(delete.iter().map(|f| f.file_size_bytes as u64).sum::()); - self.job_bytes.upgrade.record( - upgrade + for file_level in CompactionLevel::all() { + let create = create .iter() - .map(|f| f.file_size_bytes as u64) - .sum::(), - ); - self.job_rows - .create - .record(create.iter().map(|f| f.row_count as u64).sum::()); - self.job_rows - .delete - .record(delete.iter().map(|f| f.row_count as u64).sum::()); - self.job_rows - .upgrade - .record(upgrade.iter().map(|f| f.row_count as u64).sum::()); + .filter(|f| f.compaction_level == *file_level) + .collect::>(); + let delete = delete + .iter() + .filter(|f| f.compaction_level == *file_level) + .collect::>(); + let upgrade = upgrade + .iter() + .filter(|f| f.compaction_level == *file_level) + .collect::>(); + + self.job_files + .create + .get(file_level) + .expect("all compaction levels covered") + .record(create.len() as u64); + self.job_bytes + .create + .get(file_level) + .expect("all compaction levels covered") + .record(create.iter().map(|f| f.file_size_bytes as u64).sum::()); + self.job_rows + .create + .get(file_level) + .expect("all compaction levels covered") + .record(create.iter().map(|f| f.row_count as u64).sum::()); + + self.job_files + .delete + .get(file_level) + .expect("all compaction levels covered") + .record(delete.len() as u64); + self.job_bytes + .delete + .get(file_level) + .expect("all compaction levels covered") + .record(delete.iter().map(|f| f.file_size_bytes as u64).sum::()); + self.job_rows + .delete + .get(file_level) + .expect("all compaction levels covered") + .record(delete.iter().map(|f| f.row_count as u64).sum::()); + + self.job_files + .upgrade + .get(&(*file_level, target_level)) + .expect("all compaction levels covered") + .record(upgrade.len() as u64); + self.job_bytes + .upgrade + .get(&(*file_level, target_level)) + .expect("all compaction levels covered") + .record( + upgrade + .iter() + .map(|f| f.file_size_bytes as u64) + .sum::(), + ); + self.job_rows + .upgrade + .get(&(*file_level, target_level)) + .expect("all compaction levels covered") + .record(upgrade.iter().map(|f| f.row_count as u64).sum::()); + } ids } @@ -230,9 +328,13 @@ mod tests { .with_file_size_bytes(10_001) .with_row_count(1_001) .build(); - let existing_2 = ParquetFileBuilder::new(2) + let existing_2a = ParquetFileBuilder::new(2) .with_file_size_bytes(10_002) .with_row_count(1_002) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let existing_2b = ParquetFileBuilder::from(existing_2a.clone()) + .with_compaction_level(CompactionLevel::FileNonOverlapped) .build(); let existing_3 = ParquetFileBuilder::new(3) .with_file_size_bytes(10_004) @@ -247,6 +349,7 @@ mod tests { .with_file_size_bytes(10_016) .with_row_count(1_016) .with_partition(1) + .with_compaction_level(CompactionLevel::Initial) .build(); for metric_name in [ @@ -256,9 +359,52 @@ mod tests { METRIC_NAME_JOB_FILES, METRIC_NAME_JOB_ROWS, ] { - for op in ["create", "delete", "upgrade"] { - assert_eq!(hist_count(®istry, metric_name, op), 0); - assert_eq!(hist_total(®istry, metric_name, op), 0); + for file_level in CompactionLevel::all() { + for op in ["create", "delete"] { + assert_eq!( + hist_count( + ®istry, + metric_name, + [("op", op), ("level", file_level.name())] + ), + 0 + ); + assert_eq!( + hist_total( + ®istry, + metric_name, + [("op", op), ("level", file_level.name())] + ), + 0 + ); + } + + for target_level in CompactionLevel::all() { + assert_eq!( + hist_count( + ®istry, + metric_name, + [ + ("op", "upgrade"), + ("from", file_level.name()), + ("to", target_level.name()) + ] + ), + 0 + ); + assert_eq!( + hist_total( + ®istry, + metric_name, + [ + ("op", "upgrade"), + ("from", file_level.name()), + ("to", target_level.name()) + ] + ), + 0 + ); + } } } @@ -266,7 +412,7 @@ mod tests { .commit( PartitionId::new(1), &[existing_1.clone()], - &[existing_2.clone()], + &[existing_2a.clone()], &[created.clone().into()], CompactionLevel::FileNonOverlapped, ) @@ -276,7 +422,7 @@ mod tests { let ids = commit .commit( PartitionId::new(2), - &[existing_2.clone(), existing_3.clone()], + &[existing_2b.clone(), existing_3.clone()], &[existing_4.clone()], &[], CompactionLevel::Final, @@ -284,19 +430,68 @@ mod tests { .await; assert_eq!(ids, vec![]); - assert_eq!(hist_count(®istry, METRIC_NAME_FILE_BYTES, "create"), 1); - assert_eq!(hist_count(®istry, METRIC_NAME_FILE_BYTES, "upgrade"), 2); - assert_eq!(hist_count(®istry, METRIC_NAME_FILE_BYTES, "delete"), 3); assert_eq!( - hist_total(®istry, METRIC_NAME_FILE_BYTES, "create"), + hist_count( + ®istry, + METRIC_NAME_FILE_BYTES, + [("op", "create"), ("level", "L0")] + ), + 1 + ); + assert_eq!( + hist_count( + ®istry, + METRIC_NAME_FILE_BYTES, + [("op", "upgrade"), ("from", "L0"), ("to", "L1")] + ), + 1 + ); + assert_eq!( + hist_count( + ®istry, + METRIC_NAME_FILE_BYTES, + [("op", "upgrade"), ("from", "L1"), ("to", "L2")] + ), + 1 + ); + assert_eq!( + hist_count( + ®istry, + METRIC_NAME_FILE_BYTES, + [("op", "delete"), ("level", "L1")] + ), + 3 + ); + assert_eq!( + hist_total( + ®istry, + METRIC_NAME_FILE_BYTES, + [("op", "create"), ("level", "L0")] + ), 10_016 ); assert_eq!( - hist_total(®istry, METRIC_NAME_FILE_BYTES, "upgrade"), - 20_010 + hist_total( + ®istry, + METRIC_NAME_FILE_BYTES, + [("op", "upgrade"), ("from", "L0"), ("to", "L1")] + ), + 10_002 ); assert_eq!( - hist_total(®istry, METRIC_NAME_FILE_BYTES, "delete"), + hist_total( + ®istry, + METRIC_NAME_FILE_BYTES, + [("op", "upgrade"), ("from", "L1"), ("to", "L2")] + ), + 10_008 + ); + assert_eq!( + hist_total( + ®istry, + METRIC_NAME_FILE_BYTES, + [("op", "delete"), ("level", "L1")] + ), 30_007 ); @@ -306,13 +501,13 @@ mod tests { CommitHistoryEntry { partition_id: PartitionId::new(1), delete: vec![existing_1], - upgrade: vec![existing_2.clone()], + upgrade: vec![existing_2a.clone()], created: vec![created], target_level: CompactionLevel::FileNonOverlapped, }, CommitHistoryEntry { partition_id: PartitionId::new(2), - delete: vec![existing_2, existing_3], + delete: vec![existing_2b, existing_3], upgrade: vec![existing_4], created: vec![], target_level: CompactionLevel::Final, @@ -321,24 +516,32 @@ mod tests { ); } - fn hist( + fn hist( registry: &Registry, metric_name: &'static str, - op: &'static str, + attributes: [(&'static str, &'static str); N], ) -> HistogramObservation { registry .get_instrument::>(metric_name) .expect("instrument not found") - .get_observer(&Attributes::from(&[("op", op)])) + .get_observer(&Attributes::from(&attributes)) .expect("observer not found") .fetch() } - fn hist_count(registry: &Registry, metric_name: &'static str, op: &'static str) -> u64 { - hist(registry, metric_name, op).sample_count() + fn hist_count( + registry: &Registry, + metric_name: &'static str, + attributes: [(&'static str, &'static str); N], + ) -> u64 { + hist(registry, metric_name, attributes).sample_count() } - fn hist_total(registry: &Registry, metric_name: &'static str, op: &'static str) -> u64 { - hist(registry, metric_name, op).total + fn hist_total( + registry: &Registry, + metric_name: &'static str, + attributes: [(&'static str, &'static str); N], + ) -> u64 { + hist(registry, metric_name, attributes).total } } diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 62e91551e4..7c98eee08d 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -87,7 +87,7 @@ impl CompactionLevel { match self { Self::Initial => Self::FileNonOverlapped, Self::FileNonOverlapped => Self::Final, - _ => Self::Final, + Self::Final => Self::Final, } } @@ -96,7 +96,21 @@ impl CompactionLevel { match self { Self::Initial => Self::Initial, Self::FileNonOverlapped => Self::Initial, - _ => Self::FileNonOverlapped, + Self::Final => Self::FileNonOverlapped, + } + } + + /// Returns all levels + pub fn all() -> &'static [Self] { + &[Self::Initial, Self::FileNonOverlapped, Self::Final] + } + + /// Static name + pub fn name(&self) -> &'static str { + match self { + Self::Initial => "L0", + Self::FileNonOverlapped => "L1", + Self::Final => "L2", } } } diff --git a/iox_tests/src/builders.rs b/iox_tests/src/builders.rs index 785be2ad5b..b371992289 100644 --- a/iox_tests/src/builders.rs +++ b/iox_tests/src/builders.rs @@ -4,7 +4,7 @@ use data_types::{ }; use uuid::Uuid; -#[derive(Debug)] +#[derive(Debug, Clone)] /// Build up [`ParquetFile`]s for testing pub struct ParquetFileBuilder { file: ParquetFile, @@ -93,6 +93,12 @@ impl ParquetFileBuilder { } } +impl From for ParquetFileBuilder { + fn from(file: ParquetFile) -> Self { + Self { file } + } +} + #[derive(Debug)] /// Build [`Table`]s for testing pub struct TableBuilder { From 4a30df0e07759ed344976d45b539899d4333f2b9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 15 Feb 2023 09:37:50 +0000 Subject: [PATCH 2/6] chore(deps): Bump once_cell from 1.17.0 to 1.17.1 (#6991) Bumps [once_cell](https://github.com/matklad/once_cell) from 1.17.0 to 1.17.1. - [Release notes](https://github.com/matklad/once_cell/releases) - [Changelog](https://github.com/matklad/once_cell/blob/master/CHANGELOG.md) - [Commits](https://github.com/matklad/once_cell/compare/v1.17.0...v1.17.1) --- updated-dependencies: - dependency-name: once_cell dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 433b2dd6f0..01393b00c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3858,9 +3858,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.17.0" +version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" +checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" dependencies = [ "parking_lot_core 0.9.7", ] From 61fb92b85c11587bb9eb881b8f494156be098a6a Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 10 Feb 2023 16:16:28 +0100 Subject: [PATCH 3/6] feat(router): soft-delete RPC handler This implements the RPC "delete_namespace" handler, allowing a namespace to be soft-deleted. Adds unit coverage for all handlers & e2e test coverage for the new handler (the rest were already covered). The tests also highlight the caching issue documented here: https://github.com/influxdata/influxdb_iox/issues/6175 --- router/tests/grpc.rs | 132 ++++++++++++++++++++++++++ service_grpc_namespace/src/lib.rs | 151 +++++++++++++++++++++++++++--- 2 files changed, 272 insertions(+), 11 deletions(-) diff --git a/router/tests/grpc.rs b/router/tests/grpc.rs index 4350703793..cee23ae581 100644 --- a/router/tests/grpc.rs +++ b/router/tests/grpc.rs @@ -134,6 +134,138 @@ async fn test_namespace_create() { assert_eq!(response.status(), StatusCode::NO_CONTENT); } +/// Ensure invoking the gRPC NamespaceService to delete a namespace propagates +/// the catalog and denies writes after the cache has converged / router +/// restarted. +#[tokio::test] +async fn test_namespace_delete() { + // Initialise a TestContext requiring explicit namespace creation. + let ctx = TestContext::new(true, None).await; + + const RETENTION: i64 = Duration::from_secs(42 * 60 * 60).as_nanos() as _; + + // Explicitly create the namespace. + let req = CreateNamespaceRequest { + name: "bananas_test".to_string(), + retention_period_ns: Some(RETENTION), + }; + let got = ctx + .grpc_delegate() + .namespace_service() + .create_namespace(Request::new(req)) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + + assert_eq!(got.name, "bananas_test"); + assert_eq!(got.id, 1); + assert_eq!(got.retention_period_ns, Some(RETENTION)); + + // The namespace is usable. + let now = SystemProvider::default() + .now() + .timestamp_nanos() + .to_string(); + let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now; + let response = ctx + .write_lp("bananas", "test", &lp) + .await + .expect("write failed"); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + // The RPC endpoint must not return the namespace. + { + let current = ctx + .grpc_delegate() + .namespace_service() + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner(); + assert!(!current.namespaces.is_empty()); + } + + // Delete the namespace + { + let _resp = ctx + .grpc_delegate() + .namespace_service() + .delete_namespace(Request::new(DeleteNamespaceRequest { + name: "bananas_test".to_string(), + })) + .await + .expect("must delete"); + } + + // The RPC endpoint must not return the namespace. + { + let current = ctx + .grpc_delegate() + .namespace_service() + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner(); + assert!(current.namespaces.is_empty()); + } + + // The catalog should contain the namespace, but "soft-deleted". + { + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::ExcludeDeleted) + .await + .expect("query failure"); + assert!(db_list.is_empty()); + + let db_list = ctx + .catalog() + .repositories() + .await + .namespaces() + .list(SoftDeletedRows::OnlyDeleted) + .await + .expect("query failure"); + assert_matches!(db_list.as_slice(), [ns] => { + assert_eq!(ns.id.get(), got.id); + assert_eq!(ns.name, got.name); + assert_eq!(ns.retention_period_ns, got.retention_period_ns); + assert!(ns.deleted_at.is_some()); + }); + } + + // The cached entry is not affected, and writes continue to be validated + // against cached entry. + // + // https://github.com/influxdata/influxdb_iox/issues/6175 + + let response = ctx + .write_lp("bananas", "test", &lp) + .await + .expect("write failed"); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + // The router restarts, and writes are no longer accepted for the + // soft-deleted bucket. + let ctx = ctx.restart(); + + let err = ctx + .write_lp("bananas", "test", lp) + .await + .expect_err("write should fail"); + assert_matches!( + err, + router::server::http::Error::NamespaceResolver(router::namespace_resolver::Error::Lookup( + iox_catalog::interface::Error::NamespaceNotFoundByName { .. } + )) + ); +} + /// Ensure creating a namespace with a retention period of 0 maps to "infinite" /// and not "none". #[tokio::test] diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index 876a7f6ab5..da47fe3b3b 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -97,12 +97,24 @@ impl namespace_service_server::NamespaceService for NamespaceService { async fn delete_namespace( &self, - _request: Request, + request: Request, ) -> Result, Status> { - warn!("call to namespace delete - unimplemented"); - Err(Status::unimplemented( - "namespace delete is not yet supported", - )) + let namespace_name = request.into_inner().name; + + self.catalog + .repositories() + .await + .namespaces() + .soft_delete(&namespace_name) + .await + .map_err(|e| { + warn!(error=%e, %namespace_name, "failed to soft-delete namespace"); + Status::internal(e.to_string()) + })?; + + info!(namespace_name, "soft-deleted namespace"); + + Ok(Response::new(Default::default())) } async fn update_namespace_retention( @@ -179,21 +191,138 @@ fn map_retention_period(v: Option) -> Result, Status> { #[cfg(test)] mod tests { + use std::time::Duration; + + use assert_matches::assert_matches; + use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService as _; + use iox_catalog::mem::MemCatalog; use tonic::Code; use super::*; + const RETENTION: i64 = Duration::from_secs(42 * 60 * 60).as_nanos() as _; + const NS_NAME: &str = "bananas"; + #[test] fn test_retention_mapping() { - assert_matches::assert_matches!(map_retention_period(None), Ok(None)); - assert_matches::assert_matches!(map_retention_period(Some(0)), Ok(None)); - assert_matches::assert_matches!(map_retention_period(Some(1)), Ok(Some(1))); - assert_matches::assert_matches!(map_retention_period(Some(42)), Ok(Some(42))); - assert_matches::assert_matches!(map_retention_period(Some(-1)), Err(e) => { + assert_matches!(map_retention_period(None), Ok(None)); + assert_matches!(map_retention_period(Some(0)), Ok(None)); + assert_matches!(map_retention_period(Some(1)), Ok(Some(1))); + assert_matches!(map_retention_period(Some(42)), Ok(Some(42))); + assert_matches!(map_retention_period(Some(-1)), Err(e) => { assert_eq!(e.code(), Code::InvalidArgument) }); - assert_matches::assert_matches!(map_retention_period(Some(-42)), Err(e) => { + assert_matches!(map_retention_period(Some(-42)), Err(e) => { assert_eq!(e.code(), Code::InvalidArgument) }); } + + #[tokio::test] + async fn test_crud() { + let catalog: Arc = + Arc::new(MemCatalog::new(Arc::new(metric::Registry::default()))); + + let topic = catalog + .repositories() + .await + .topics() + .create_or_get("kafka-topic") + .await + .unwrap(); + let query_pool = catalog + .repositories() + .await + .query_pools() + .create_or_get("query-pool") + .await + .unwrap(); + + let handler = NamespaceService::new(catalog, Some(topic.id), Some(query_pool.id)); + + // There should be no namespaces to start with. + { + let current = handler + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner() + .namespaces; + assert!(current.is_empty()); + } + + let req = CreateNamespaceRequest { + name: NS_NAME.to_string(), + retention_period_ns: Some(RETENTION), + }; + let created_ns = handler + .create_namespace(Request::new(req)) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(created_ns.name, NS_NAME); + assert_eq!(created_ns.retention_period_ns, Some(RETENTION)); + + // There should now be one namespace + { + let current = handler + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner() + .namespaces; + assert_matches!(current.as_slice(), [ns] => { + assert_eq!(ns, &created_ns); + }) + } + + // Update the retention period + let updated_ns = handler + .update_namespace_retention(Request::new(UpdateNamespaceRetentionRequest { + name: NS_NAME.to_string(), + retention_period_ns: Some(0), // A zero! + })) + .await + .expect("failed to create namespace") + .into_inner() + .namespace + .expect("no namespace in response"); + assert_eq!(updated_ns.name, created_ns.name); + assert_eq!(updated_ns.id, created_ns.id); + assert_eq!(created_ns.retention_period_ns, Some(42)); + assert_eq!(updated_ns.retention_period_ns, None); + + // Listing the namespaces should return the updated namespace + { + let current = handler + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner() + .namespaces; + assert_matches!(current.as_slice(), [ns] => { + assert_eq!(ns, &updated_ns); + }) + } + + // Deleting the namespace should cause it to disappear + handler + .delete_namespace(Request::new(DeleteNamespaceRequest { + name: NS_NAME.to_string(), + })) + .await + .expect("must delete"); + + // Listing the namespaces should now return nothing. + { + let current = handler + .get_namespaces(Request::new(Default::default())) + .await + .expect("must return namespaces") + .into_inner() + .namespaces; + assert_matches!(current.as_slice(), []); + } + } } From dbe9219a8b0cfe0caf13fcf047de1a99673a90ac Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 14 Feb 2023 19:12:44 +0100 Subject: [PATCH 4/6] test: assert correct retention period Use the correct constant in the retention period assert. --- service_grpc_namespace/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index da47fe3b3b..a69ea01033 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -290,7 +290,7 @@ mod tests { .expect("no namespace in response"); assert_eq!(updated_ns.name, created_ns.name); assert_eq!(updated_ns.id, created_ns.id); - assert_eq!(created_ns.retention_period_ns, Some(42)); + assert_eq!(created_ns.retention_period_ns, Some(RETENTION)); assert_eq!(updated_ns.retention_period_ns, None); // Listing the namespaces should return the updated namespace From 5ca165b76e1b3a792cf1de9a31b4726070f92484 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 15 Feb 2023 10:49:07 +0100 Subject: [PATCH 5/6] docs: fix two typos Comments in test code. --- router/tests/grpc.rs | 2 +- service_grpc_namespace/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/router/tests/grpc.rs b/router/tests/grpc.rs index cee23ae581..b65846b468 100644 --- a/router/tests/grpc.rs +++ b/router/tests/grpc.rs @@ -175,7 +175,7 @@ async fn test_namespace_delete() { .expect("write failed"); assert_eq!(response.status(), StatusCode::NO_CONTENT); - // The RPC endpoint must not return the namespace. + // The RPC endpoint must return a namespace. { let current = ctx .grpc_delegate() diff --git a/service_grpc_namespace/src/lib.rs b/service_grpc_namespace/src/lib.rs index a69ea01033..79fc7603bc 100644 --- a/service_grpc_namespace/src/lib.rs +++ b/service_grpc_namespace/src/lib.rs @@ -284,7 +284,7 @@ mod tests { retention_period_ns: Some(0), // A zero! })) .await - .expect("failed to create namespace") + .expect("failed to update namespace") .into_inner() .namespace .expect("no namespace in response"); From 1805f7fbe39cdab61e9609230d46c416246ae5f1 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 17 Jan 2023 11:36:36 +0100 Subject: [PATCH 6/6] refactor: SequenceNumberSet::as_bytes -> to_bytes The conversion is not (always) a cheap cast/conversion, and therefore should be prefixed with "to_" and not "as_". --- data_types/src/sequence_number_set.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data_types/src/sequence_number_set.rs b/data_types/src/sequence_number_set.rs index 97a332a8d3..306b388fd4 100644 --- a/data_types/src/sequence_number_set.rs +++ b/data_types/src/sequence_number_set.rs @@ -31,12 +31,12 @@ impl SequenceNumberSet { self.0.andnot_inplace(&other.0) } - /// Serialise `self` into a set of bytes. + /// Serialise `self` into an array of bytes. /// /// [This document][spec] describes the serialised format. /// /// [spec]: https://github.com/RoaringBitmap/RoaringFormatSpec/ - pub fn as_bytes(&self) -> Vec { + pub fn to_bytes(&self) -> Vec { self.0.serialize() }