Merge pull request #6795 from influxdata/dom/0-retention
fix(router): 0 value retention periodspull/24376/head
commit
a8cb3aeba8
|
@ -5231,6 +5231,7 @@ dependencies = [
|
|||
name = "service_grpc_namespace"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"assert_matches",
|
||||
"data_types",
|
||||
"generated_types",
|
||||
"iox_catalog",
|
||||
|
|
|
@ -7,10 +7,11 @@ use generated_types::influxdata::iox::namespace::v1::{
|
|||
use hyper::StatusCode;
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use router::{
|
||||
dml_handlers::{DmlError, RetentionError},
|
||||
namespace_resolver::{self, NamespaceCreationError},
|
||||
server::http::Error,
|
||||
};
|
||||
use tonic::Request;
|
||||
use tonic::{Code, Request};
|
||||
|
||||
use crate::common::TestContext;
|
||||
|
||||
|
@ -83,7 +84,7 @@ async fn test_namespace_create() {
|
|||
.namespace_service()
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.expect("failed to create namesapce")
|
||||
.expect("failed to create namespace")
|
||||
.into_inner()
|
||||
.namespace
|
||||
.expect("no namespace in response");
|
||||
|
@ -130,3 +131,287 @@ async fn test_namespace_create() {
|
|||
.expect("write failed");
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
}
|
||||
|
||||
/// Ensure creating a namespace with a retention period of 0 maps to "infinite"
|
||||
/// and not "none".
|
||||
#[tokio::test]
|
||||
async fn test_create_namespace_0_retention_period() {
|
||||
// Initialise a TestContext requiring explicit namespace creation.
|
||||
let ctx = TestContext::new(false, None).await;
|
||||
|
||||
// Explicitly create the namespace.
|
||||
let req = CreateNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(0), // A zero!
|
||||
};
|
||||
let got = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
.into_inner()
|
||||
.namespace
|
||||
.expect("no namespace in response");
|
||||
|
||||
assert_eq!(got.name, "bananas_test");
|
||||
assert_eq!(got.id, 1);
|
||||
assert_eq!(got.retention_period_ns, None);
|
||||
|
||||
// The list namespace RPC should show the new namespace
|
||||
{
|
||||
let list = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner();
|
||||
assert_matches!(list.namespaces.as_slice(), [ns] => {
|
||||
assert_eq!(*ns, got);
|
||||
});
|
||||
}
|
||||
|
||||
// The catalog should contain the namespace.
|
||||
{
|
||||
let db_list = ctx
|
||||
.catalog()
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.list()
|
||||
.await
|
||||
.expect("query failure");
|
||||
assert_matches!(db_list.as_slice(), [ns] => {
|
||||
assert_eq!(ns.id.get(), got.id);
|
||||
assert_eq!(ns.name, got.name);
|
||||
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
|
||||
});
|
||||
}
|
||||
|
||||
// And writing should succeed
|
||||
let response = ctx
|
||||
.write_lp("bananas", "test", "platanos,tag1=A,tag2=B val=42i 42424242")
|
||||
.await
|
||||
.expect("write failed");
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
}
|
||||
|
||||
/// Ensure creating a namespace with a negative retention period is rejected.
|
||||
#[tokio::test]
|
||||
async fn test_create_namespace_negative_retention_period() {
|
||||
// Initialise a TestContext requiring explicit namespace creation.
|
||||
let ctx = TestContext::new(false, None).await;
|
||||
|
||||
// Explicitly create the namespace.
|
||||
let req = CreateNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(-42),
|
||||
};
|
||||
let err = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.create_namespace(Request::new(req))
|
||||
.await
|
||||
.expect_err("negative retention period should fail to create namespace");
|
||||
|
||||
assert_eq!(err.code(), Code::InvalidArgument);
|
||||
assert_eq!(err.message(), "invalid negative retention period");
|
||||
|
||||
// The list namespace RPC should not show a new namespace
|
||||
{
|
||||
let list = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner();
|
||||
assert!(list.namespaces.is_empty());
|
||||
}
|
||||
|
||||
// The catalog should not contain the namespace.
|
||||
{
|
||||
let db_list = ctx
|
||||
.catalog()
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.list()
|
||||
.await
|
||||
.expect("query failure");
|
||||
assert!(db_list.is_empty());
|
||||
}
|
||||
|
||||
// And writing should not succeed
|
||||
let response = ctx
|
||||
.write_lp("bananas", "test", "platanos,tag1=A,tag2=B val=42i 42424242")
|
||||
.await
|
||||
.expect_err("write should fail");
|
||||
assert_matches!(
|
||||
response,
|
||||
Error::NamespaceResolver(namespace_resolver::Error::Create(
|
||||
NamespaceCreationError::Reject(_)
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
/// Ensure updating a namespace with a retention period of 0 maps to "infinite"
|
||||
/// and not "none".
|
||||
#[tokio::test]
|
||||
async fn test_update_namespace_0_retention_period() {
|
||||
// Initialise a TestContext requiring explicit namespace creation.
|
||||
let ctx = TestContext::new(false, None).await;
|
||||
|
||||
// Explicitly create the namespace.
|
||||
let create = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.create_namespace(Request::new(CreateNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(42),
|
||||
}))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
.into_inner()
|
||||
.namespace
|
||||
.expect("no namespace in response");
|
||||
|
||||
// And writing in the past should fail
|
||||
ctx.write_lp("bananas", "test", "platanos,tag1=A,tag2=B val=42i 42424242")
|
||||
.await
|
||||
.expect_err("write outside retention period should fail");
|
||||
|
||||
let got = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.update_namespace_retention(Request::new(UpdateNamespaceRetentionRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(0), // A zero!
|
||||
}))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
.into_inner()
|
||||
.namespace
|
||||
.expect("no namespace in response");
|
||||
|
||||
assert_eq!(got.name, create.name);
|
||||
assert_eq!(got.id, create.id);
|
||||
assert_eq!(create.retention_period_ns, Some(42));
|
||||
assert_eq!(got.retention_period_ns, None);
|
||||
|
||||
// The list namespace RPC should show the updated namespace
|
||||
{
|
||||
let list = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner();
|
||||
assert_matches!(list.namespaces.as_slice(), [ns] => {
|
||||
assert_eq!(*ns, got);
|
||||
});
|
||||
}
|
||||
|
||||
// The catalog should contain the namespace.
|
||||
{
|
||||
let db_list = ctx
|
||||
.catalog()
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.list()
|
||||
.await
|
||||
.expect("query failure");
|
||||
assert_matches!(db_list.as_slice(), [ns] => {
|
||||
assert_eq!(ns.id.get(), got.id);
|
||||
assert_eq!(ns.name, got.name);
|
||||
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
|
||||
});
|
||||
}
|
||||
|
||||
// The cached entry is not affected, and writes continue to be validated
|
||||
// against the old value.
|
||||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/6175
|
||||
|
||||
let err = ctx
|
||||
.write_lp("bananas", "test", "platanos,tag1=A,tag2=B val=42i 42424242")
|
||||
.await
|
||||
.expect_err("cached entry rejects write");
|
||||
|
||||
assert_matches!(
|
||||
err,
|
||||
router::server::http::Error::DmlHandler(DmlError::Retention(
|
||||
RetentionError::OutsideRetention(name)
|
||||
)) => {
|
||||
assert_eq!(name, "platanos");
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/// Ensure updating a namespace with a negative retention period fails.
|
||||
#[tokio::test]
|
||||
async fn test_update_namespace_negative_retention_period() {
|
||||
// Initialise a TestContext requiring explicit namespace creation.
|
||||
let ctx = TestContext::new(false, None).await;
|
||||
|
||||
// Explicitly create the namespace.
|
||||
let create = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.create_namespace(Request::new(CreateNamespaceRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(42),
|
||||
}))
|
||||
.await
|
||||
.expect("failed to create namespace")
|
||||
.into_inner()
|
||||
.namespace
|
||||
.expect("no namespace in response");
|
||||
|
||||
let err = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.update_namespace_retention(Request::new(UpdateNamespaceRetentionRequest {
|
||||
name: "bananas_test".to_string(),
|
||||
retention_period_ns: Some(-42),
|
||||
}))
|
||||
.await
|
||||
.expect_err("negative retention period should fail to create namespace");
|
||||
|
||||
assert_eq!(err.code(), Code::InvalidArgument);
|
||||
assert_eq!(err.message(), "invalid negative retention period");
|
||||
|
||||
// The list namespace RPC should show the original namespace
|
||||
{
|
||||
let list = ctx
|
||||
.grpc_delegate()
|
||||
.namespace_service()
|
||||
.get_namespaces(Request::new(Default::default()))
|
||||
.await
|
||||
.expect("must return namespaces")
|
||||
.into_inner();
|
||||
assert_matches!(list.namespaces.as_slice(), [ns] => {
|
||||
assert_eq!(*ns, create);
|
||||
});
|
||||
}
|
||||
|
||||
// The catalog should contain the original namespace.
|
||||
{
|
||||
let db_list = ctx
|
||||
.catalog()
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.list()
|
||||
.await
|
||||
.expect("query failure");
|
||||
assert_matches!(db_list.as_slice(), [ns] => {
|
||||
assert_eq!(ns.id.get(), create.id);
|
||||
assert_eq!(ns.name, create.name);
|
||||
assert_eq!(ns.retention_period_ns, create.retention_period_ns);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ iox_catalog = { path = "../iox_catalog" }
|
|||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies]
|
||||
assert_matches = "1.5.0"
|
||||
iox_tests = { path = "../iox_tests" }
|
||||
metric = { path = "../metric" }
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
|
||||
|
|
|
@ -64,6 +64,8 @@ impl namespace_service_server::NamespaceService for NamespaceService {
|
|||
retention_period_ns,
|
||||
} = request.into_inner();
|
||||
|
||||
let retention_period_ns = map_retention_period(retention_period_ns)?;
|
||||
|
||||
debug!(%namespace_name, ?retention_period_ns, "Creating namespace");
|
||||
|
||||
let namespace = repos
|
||||
|
@ -110,6 +112,8 @@ impl namespace_service_server::NamespaceService for NamespaceService {
|
|||
retention_period_ns,
|
||||
} = request.into_inner();
|
||||
|
||||
let retention_period_ns = map_retention_period(retention_period_ns)?;
|
||||
|
||||
debug!(%namespace_name, ?retention_period_ns, "Updating namespace retention");
|
||||
|
||||
let namespace = repos
|
||||
|
@ -151,3 +155,41 @@ fn create_namespace_to_proto(namespace: CatalogNamespace) -> CreateNamespaceResp
|
|||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Map a user-submitted retention period value to the correct internal
|
||||
/// encoding.
|
||||
///
|
||||
/// 0 is always mapped to [`None`], indicating infinite retention.
|
||||
///
|
||||
/// Negative retention periods are rejected with an error.
|
||||
fn map_retention_period(v: Option<i64>) -> Result<Option<i64>, Status> {
|
||||
match v {
|
||||
Some(0) => Ok(None),
|
||||
Some(v @ 1..) => Ok(Some(v)),
|
||||
Some(_v @ ..=0) => Err(Status::invalid_argument(
|
||||
"invalid negative retention period",
|
||||
)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tonic::Code;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_retention_mapping() {
|
||||
assert_matches::assert_matches!(map_retention_period(None), Ok(None));
|
||||
assert_matches::assert_matches!(map_retention_period(Some(0)), Ok(None));
|
||||
assert_matches::assert_matches!(map_retention_period(Some(1)), Ok(Some(1)));
|
||||
assert_matches::assert_matches!(map_retention_period(Some(42)), Ok(Some(42)));
|
||||
assert_matches::assert_matches!(map_retention_period(Some(-1)), Err(e) => {
|
||||
assert_eq!(e.code(), Code::InvalidArgument)
|
||||
});
|
||||
assert_matches::assert_matches!(map_retention_period(Some(-42)), Err(e) => {
|
||||
assert_eq!(e.code(), Code::InvalidArgument)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue