Merge pull request #6720 from influxdata/dom/fix-namespace-rpc

fix(router): reject new namespaces / deny namespace auto-creation
pull/24376/head
Dom 2023-01-26 17:03:39 +00:00 committed by GitHub
commit ccc5cdcd80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 497 additions and 316 deletions

View File

@ -7,7 +7,13 @@ use iox_catalog::interface::Catalog;
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorSource},
reexport::tonic::transport::Endpoint,
reexport::{
generated_types::influxdata::iox::{
catalog::v1::catalog_service_server, namespace::v1::namespace_service_server,
object_store::v1::object_store_service_server, schema::v1::schema_service_server,
},
tonic::transport::Endpoint,
},
rpc::RpcBuilderInput,
serve_builder,
server_type::{CommonServerState, RpcError, ServerType},
@ -207,9 +213,26 @@ where
/// [`RpcWriteGrpcDelegate`]: router::server::grpc::RpcWriteGrpcDelegate
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
add_service!(builder, self.server.grpc().schema_service());
add_service!(builder, self.server.grpc().catalog_service());
add_service!(builder, self.server.grpc().object_store_service());
add_service!(
builder,
schema_service_server::SchemaServiceServer::new(self.server.grpc().schema_service())
);
add_service!(
builder,
catalog_service_server::CatalogServiceServer::new(self.server.grpc().catalog_service())
);
add_service!(
builder,
object_store_service_server::ObjectStoreServiceServer::new(
self.server.grpc().object_store_service()
)
);
add_service!(
builder,
namespace_service_server::NamespaceServiceServer::new(
self.server.grpc().namespace_service()
)
);
serve_builder!(builder);
Ok(())

View File

@ -36,10 +36,10 @@ impl NamespaceResolver for MockNamespaceResolver {
&self,
namespace: &NamespaceName<'static>,
) -> Result<NamespaceId, super::Error> {
Ok(*self
.map
.lock()
.get(namespace)
.expect("mock namespace resolver does not have ID"))
Ok(*self.map.lock().get(namespace).ok_or(super::Error::Lookup(
iox_catalog::interface::Error::NamespaceNotFoundByName {
name: namespace.to_string(),
},
))?)
}
}

View File

@ -91,15 +91,27 @@ where
if self.cache.get_schema(namespace).is_none() {
trace!(%namespace, "namespace not found in cache");
let mut repos = self.catalog.repositories().await;
match self.action {
MissingNamespaceAction::Reject => {
debug!(%namespace, "namespace not in catalog and autocreation disabled");
return Err(NamespaceCreationError::Reject(namespace.into()).into());
// The namespace is not cached, but may exist in the
// catalog. Delegate discovery down to the inner handler,
// and map the lookup error to a reject error.
match self.inner.get_namespace_id(namespace).await {
Ok(v) => return Ok(v),
Err(super::Error::Lookup(
iox_catalog::interface::Error::NamespaceNotFoundByName { .. },
)) => {
warn!(%namespace, "namespace not in catalog and auto-creation disabled");
return Err(NamespaceCreationError::Reject(namespace.into()).into());
}
Err(e) => return Err(e),
}
}
MissingNamespaceAction::AutoCreate(retention_period_ns) => {
match repos
match self
.catalog
.repositories()
.await
.namespaces()
.create(
namespace.as_str(),
@ -141,7 +153,8 @@ mod tests {
use super::*;
use crate::{
namespace_cache::MemoryNamespaceCache, namespace_resolver::mock::MockNamespaceResolver,
namespace_cache::MemoryNamespaceCache,
namespace_resolver::{mock::MockNamespaceResolver, NamespaceSchemaResolver},
};
/// Common retention period value we'll use in tests
@ -256,7 +269,7 @@ mod tests {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let creator = NamespaceAutocreation::new(
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
MockNamespaceResolver::default(),
cache,
Arc::clone(&catalog),
TopicId::new(42),
@ -276,4 +289,45 @@ mod tests {
let mut repos = catalog.repositories().await;
assert_matches!(repos.namespaces().get_by_name(ns.as_str()).await, Ok(None));
}
#[tokio::test]
async fn test_reject_exists_in_catalog() {
let ns = NamespaceName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
// First drive the population of the catalog
let creator = NamespaceAutocreation::new(
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)),
Arc::clone(&cache),
Arc::clone(&catalog),
TopicId::new(42),
QueryPoolId::new(42),
MissingNamespaceAction::AutoCreate(TEST_RETENTION_PERIOD_NS),
);
let created_id = creator
.get_namespace_id(&ns)
.await
.expect("handler should succeed");
// Now try in "reject" mode.
let creator = NamespaceAutocreation::new(
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)),
cache,
Arc::clone(&catalog),
TopicId::new(42),
QueryPoolId::new(42),
MissingNamespaceAction::Reject,
);
// It should not autocreate because we specified "rejection" behaviour, above
let id = creator
.get_namespace_id(&ns)
.await
.expect("should allow existing namespace from catalog");
assert_eq!(created_id, id);
}
}

View File

@ -49,49 +49,33 @@ impl RpcWriteGrpcDelegate {
/// Acquire a [`SchemaService`] gRPC service implementation.
///
/// [`SchemaService`]: generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService.
pub fn schema_service(&self) -> schema_service_server::SchemaServiceServer<SchemaService> {
schema_service_server::SchemaServiceServer::new(SchemaService::new(Arc::clone(
&self.catalog,
)))
pub fn schema_service(&self) -> SchemaService {
SchemaService::new(Arc::clone(&self.catalog))
}
/// Acquire a [`CatalogService`] gRPC service implementation.
///
/// [`CatalogService`]: generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService.
pub fn catalog_service(
&self,
) -> catalog_service_server::CatalogServiceServer<impl catalog_service_server::CatalogService>
{
catalog_service_server::CatalogServiceServer::new(CatalogService::new(Arc::clone(
&self.catalog,
)))
pub fn catalog_service(&self) -> impl catalog_service_server::CatalogService {
CatalogService::new(Arc::clone(&self.catalog))
}
/// Acquire a [`ObjectStoreService`] gRPC service implementation.
///
/// [`ObjectStoreService`]: generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService.
pub fn object_store_service(
&self,
) -> object_store_service_server::ObjectStoreServiceServer<
impl object_store_service_server::ObjectStoreService,
> {
object_store_service_server::ObjectStoreServiceServer::new(ObjectStoreService::new(
Arc::clone(&self.catalog),
Arc::clone(&self.object_store),
))
pub fn object_store_service(&self) -> impl object_store_service_server::ObjectStoreService {
ObjectStoreService::new(Arc::clone(&self.catalog), Arc::clone(&self.object_store))
}
/// Acquire a [`NamespaceService`] gRPC service implementation.
///
/// [`NamespaceService`]: generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService.
pub fn namespace_service(
&self,
) -> namespace_service_server::NamespaceServiceServer<NamespaceService> {
namespace_service_server::NamespaceServiceServer::new(NamespaceService::new(
pub fn namespace_service(&self) -> impl namespace_service_server::NamespaceService {
NamespaceService::new(
Arc::clone(&self.catalog),
Some(self.topic_id),
Some(self.query_id),
))
)
}
}

View File

@ -85,7 +85,7 @@ pub enum Error {
/// name into a [`NamespaceId`].
///
/// [`NamespaceId`]: data_types::NamespaceId
#[error("failed to resolve namespace ID: {0}")]
#[error(transparent)]
NamespaceResolver(#[from] crate::namespace_resolver::Error),
/// The router is currently servicing the maximum permitted number of
@ -561,7 +561,7 @@ mod tests {
use super::*;
use crate::{
dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall},
namespace_resolver::mock::MockNamespaceResolver,
namespace_resolver::{mock::MockNamespaceResolver, NamespaceCreationError},
};
use assert_matches::assert_matches;
use data_types::{NamespaceId, NamespaceNameError};
@ -1513,10 +1513,16 @@ mod tests {
crate::namespace_resolver::Error::Lookup(e)
}),
"failed to resolve namespace ID: \
failed to resolve namespace ID: \
name [name] already exists",
),
(
NamespaceResolver(
crate::namespace_resolver::Error::Create(NamespaceCreationError::Reject("bananas".to_string()))
),
"rejecting write due to non-existing namespace: bananas",
),
(
RequestLimit,
"this service is overloaded, please try again later",

220
router/tests/common/mod.rs Normal file
View File

@ -0,0 +1,220 @@
use std::{collections::BTreeSet, iter, string::String, sync::Arc};
use data_types::{PartitionTemplate, QueryPoolId, TableId, TemplatePart, TopicId};
use hashbrown::HashMap;
use hyper::{Body, Request, Response};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use metric::Registry;
use mutable_batch::MutableBatch;
use object_store::memory::InMemory;
use router::{
dml_handlers::{
Chain, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioned,
Partitioner, RetentionValidator, SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
namespace_resolver::{MissingNamespaceAction, NamespaceAutocreation, NamespaceSchemaResolver},
server::{grpc::RpcWriteGrpcDelegate, http::HttpDelegate},
shard::Shard,
};
use sharder::JumpHash;
use write_buffer::{
core::WriteBufferWriting,
mock::{MockBufferForWriting, MockBufferSharedState},
};
/// The topic catalog ID assigned by the namespace auto-creator in the
/// handler stack for namespaces it has not yet observed.
pub const TEST_TOPIC_ID: i64 = 1;
/// The query pool catalog ID assigned by the namespace auto-creator in the
/// handler stack for namespaces it has not yet observed.
pub const TEST_QUERY_POOL_ID: i64 = 1;
/// Common retention period value we'll use in tests
pub const TEST_RETENTION_PERIOD_NS: Option<i64> = Some(3_600 * 1_000_000_000);
#[derive(Debug)]
pub struct TestContext {
http_delegate: HttpDelegateStack,
grpc_delegate: RpcWriteGrpcDelegate,
catalog: Arc<dyn Catalog>,
write_buffer_state: Arc<MockBufferSharedState>,
metrics: Arc<Registry>,
}
// This mass of words is certainly a downside of chained handlers.
//
// Fortunately the compiler errors are very descriptive and updating this is
// relatively easy when something changes!
type HttpDelegateStack = HttpDelegate<
InstrumentationDecorator<
Chain<
Chain<
Chain<
RetentionValidator<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
SchemaValidator<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
>,
Partitioner,
>,
WriteSummaryAdapter<
FanOutAdaptor<
ShardedWriteBuffer<JumpHash<Arc<Shard>>>,
Vec<Partitioned<HashMap<TableId, (String, MutableBatch)>>>,
>,
>,
>,
>,
NamespaceAutocreation<
Arc<ShardedCache<Arc<MemoryNamespaceCache>>>,
NamespaceSchemaResolver<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
>,
>;
/// A [`router`] stack configured with the various DML handlers using mock
/// catalog / write buffer backends.
impl TestContext {
pub async fn new(autocreate_ns: bool, ns_autocreate_retention_period_ns: Option<i64>) -> Self {
let metrics = Arc::new(metric::Registry::default());
let time = iox_time::MockProvider::new(
iox_time::Time::from_timestamp_millis(668563200000).unwrap(),
);
let write_buffer = MockBufferForWriting::new(
MockBufferSharedState::empty_with_n_shards(1.try_into().unwrap()),
None,
Arc::new(time),
)
.expect("failed to init mock write buffer");
let write_buffer_state = write_buffer.state();
let write_buffer: Arc<dyn WriteBufferWriting> = Arc::new(write_buffer);
let shards: BTreeSet<_> = write_buffer.shard_indexes();
let sharded_write_buffer = ShardedWriteBuffer::new(JumpHash::new(
shards
.into_iter()
.map(|shard_index| Shard::new(shard_index, Arc::clone(&write_buffer), &metrics))
.map(Arc::new),
));
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
let retention_validator =
RetentionValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let schema_validator =
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &metrics);
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});
let handler_stack = retention_validator
.and_then(schema_validator)
.and_then(partitioner)
.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(
sharded_write_buffer,
)));
let handler_stack = InstrumentationDecorator::new("request", &metrics, handler_stack);
let namespace_resolver =
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let namespace_resolver = NamespaceAutocreation::new(
namespace_resolver,
Arc::clone(&ns_cache),
Arc::clone(&catalog),
TopicId::new(TEST_TOPIC_ID),
QueryPoolId::new(TEST_QUERY_POOL_ID),
{
if autocreate_ns {
MissingNamespaceAction::AutoCreate(ns_autocreate_retention_period_ns)
} else {
MissingNamespaceAction::Reject
}
},
);
let http_delegate =
HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics);
let grpc_delegate = RpcWriteGrpcDelegate::new(
Arc::clone(&catalog),
Arc::new(InMemory::default()),
TopicId::new(TEST_TOPIC_ID),
QueryPoolId::new(TEST_QUERY_POOL_ID),
);
Self {
http_delegate,
grpc_delegate,
catalog,
write_buffer_state,
metrics,
}
}
/// Get a reference to the test context's http delegate.
pub fn http_delegate(&self) -> &HttpDelegateStack {
&self.http_delegate
}
/// Get a reference to the test context's grpc delegate.
pub fn grpc_delegate(&self) -> &RpcWriteGrpcDelegate {
&self.grpc_delegate
}
/// Get a reference to the test context's catalog.
pub fn catalog(&self) -> Arc<dyn Catalog> {
Arc::clone(&self.catalog)
}
/// Get a reference to the test context's write buffer state.
pub fn write_buffer_state(&self) -> &Arc<MockBufferSharedState> {
&self.write_buffer_state
}
/// Get a reference to the test context's metrics.
pub fn metrics(&self) -> &Registry {
self.metrics.as_ref()
}
/// Return the [`TableId`] in the catalog for `name` in `namespace`, or panic.
pub async fn table_id(&self, namespace: &str, name: &str) -> TableId {
let mut repos = self.catalog.repositories().await;
let namespace_id = repos
.namespaces()
.get_by_name(namespace)
.await
.expect("query failed")
.expect("namespace does not exist")
.id;
repos
.tables()
.get_by_namespace_and_name(namespace_id, name)
.await
.expect("query failed")
.expect("no table entry for the specified namespace/table name pair")
.id
}
/// A helper method to write LP to this [`TestContext`].
pub async fn write_lp(
&self,
org: &str,
bucket: &str,
lp: impl Into<String>,
) -> Result<Response<Body>, router::server::http::Error> {
let request = Request::builder()
.uri(format!(
"https://bananas.example/api/v2/write?org={org}&bucket={bucket}"
))
.method("POST")
.body(Body::from(lp.into()))
.expect("failed to construct HTTP request");
self.http_delegate().route(request).await
}
}

132
router/tests/grpc.rs Normal file
View File

@ -0,0 +1,132 @@
use std::time::Duration;
use assert_matches::assert_matches;
use generated_types::influxdata::iox::namespace::v1::{
namespace_service_server::NamespaceService, *,
};
use hyper::StatusCode;
use iox_time::{SystemProvider, TimeProvider};
use router::{
namespace_resolver::{self, NamespaceCreationError},
server::http::Error,
};
use tonic::Request;
use crate::common::TestContext;
pub mod common;
/// Ensure invoking the gRPC NamespaceService to create a namespace populates
/// the catalog.
#[tokio::test]
async fn test_namespace_create() {
// Initialise a TestContext requiring explicit namespace creation.
let ctx = TestContext::new(false, None).await;
// Try writing to the non-existant namespace, which should return an error.
let now = SystemProvider::default()
.now()
.timestamp_nanos()
.to_string();
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
let response = ctx
.write_lp("bananas", "test", &lp)
.await
.expect_err("write failed");
assert_matches!(
response,
Error::NamespaceResolver(namespace_resolver::Error::Create(
NamespaceCreationError::Reject(_)
))
);
assert_eq!(
response.to_string(),
"rejecting write due to non-existing namespace: bananas_test"
);
// The failed write MUST NOT populate the catalog.
{
let current = ctx
.catalog()
.repositories()
.await
.namespaces()
.list()
.await
.expect("failed to query for existing namespaces");
assert!(current.is_empty());
}
// The RPC endpoint must know nothing about the namespace either.
{
let current = ctx
.grpc_delegate()
.namespace_service()
.get_namespaces(Request::new(Default::default()))
.await
.expect("must return namespaces")
.into_inner();
assert!(current.namespaces.is_empty());
}
const RETENTION: i64 = Duration::from_secs(42 * 60 * 60).as_nanos() as _;
// Explicitly create the namespace.
let req = CreateNamespaceRequest {
name: "bananas_test".to_string(),
retention_period_ns: Some(RETENTION),
};
let got = ctx
.grpc_delegate()
.namespace_service()
.create_namespace(Request::new(req))
.await
.expect("failed to create namesapce")
.into_inner()
.namespace
.expect("no namespace in response");
assert_eq!(got.name, "bananas_test");
assert_eq!(got.id, 1);
assert_eq!(got.retention_period_ns, Some(RETENTION));
// The list namespace RPC should show the new namespace
{
let list = ctx
.grpc_delegate()
.namespace_service()
.get_namespaces(Request::new(Default::default()))
.await
.expect("must return namespaces")
.into_inner();
assert_matches!(list.namespaces.as_slice(), [ns] => {
assert_eq!(*ns, got);
});
}
// The catalog should contain the namespace.
{
let db_list = ctx
.catalog()
.repositories()
.await
.namespaces()
.list()
.await
.expect("query failure");
assert_matches!(db_list.as_slice(), [ns] => {
assert_eq!(ns.id.get(), got.id);
assert_eq!(ns.name, got.name);
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
});
}
// And writing should succeed
let response = ctx
.write_lp("bananas", "test", lp)
.await
.expect("write failed");
assert_eq!(response.status(), StatusCode::NO_CONTENT);
}

View File

@ -1,205 +1,22 @@
use std::{collections::BTreeSet, iter, string::String, sync::Arc};
use std::sync::Arc;
use assert_matches::assert_matches;
use data_types::{
ColumnType, PartitionTemplate, QueryPoolId, ShardIndex, TableId, TemplatePart, TopicId,
};
use data_types::{ColumnType, QueryPoolId, ShardIndex, TopicId};
use dml::DmlOperation;
use futures::{stream::FuturesUnordered, StreamExt};
use hashbrown::HashMap;
use hyper::{Body, Request, StatusCode};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram, Metric, Registry, U64Counter};
use mutable_batch::MutableBatch;
use router::{
dml_handlers::{
Chain, DmlError, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioned,
Partitioner, RetentionError, RetentionValidator, SchemaError, SchemaValidator,
ShardedWriteBuffer, WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
namespace_resolver::{MissingNamespaceAction, NamespaceAutocreation, NamespaceSchemaResolver},
server::http::HttpDelegate,
shard::Shard,
};
use sharder::JumpHash;
use write_buffer::{
core::WriteBufferWriting,
mock::{MockBufferForWriting, MockBufferSharedState},
};
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
use router::dml_handlers::{DmlError, RetentionError, SchemaError};
/// The topic catalog ID assigned by the namespace auto-creator in the
/// handler stack for namespaces it has not yet observed.
const TEST_TOPIC_ID: i64 = 1;
use crate::common::{TestContext, TEST_QUERY_POOL_ID, TEST_RETENTION_PERIOD_NS, TEST_TOPIC_ID};
/// The query pool catalog ID assigned by the namespace auto-creator in the
/// handler stack for namespaces it has not yet observed.
const TEST_QUERY_POOL_ID: i64 = 1;
/// Common retention period value we'll use in tests
const TEST_RETENTION_PERIOD_NS: Option<i64> = Some(3_600 * 1_000_000_000);
pub struct TestContext {
delegate: HttpDelegateStack,
catalog: Arc<dyn Catalog>,
write_buffer_state: Arc<MockBufferSharedState>,
metrics: Arc<Registry>,
}
// This mass of words is certainly a downside of chained handlers.
//
// Fortunately the compiler errors are very descriptive and updating this is
// relatively easy when something changes!
type HttpDelegateStack = HttpDelegate<
InstrumentationDecorator<
Chain<
Chain<
Chain<
RetentionValidator<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
SchemaValidator<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
>,
Partitioner,
>,
WriteSummaryAdapter<
FanOutAdaptor<
ShardedWriteBuffer<JumpHash<Arc<Shard>>>,
Vec<Partitioned<HashMap<TableId, (String, MutableBatch)>>>,
>,
>,
>,
>,
NamespaceAutocreation<
Arc<ShardedCache<Arc<MemoryNamespaceCache>>>,
NamespaceSchemaResolver<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
>,
>;
/// A [`router`] stack configured with the various DML handlers using mock
/// catalog / write buffer backends.
impl TestContext {
pub fn new(autocreate_ns: bool, ns_autocreate_retention_period_ns: Option<i64>) -> Self {
let metrics = Arc::new(metric::Registry::default());
let time = iox_time::MockProvider::new(
iox_time::Time::from_timestamp_millis(668563200000).unwrap(),
);
let write_buffer = MockBufferForWriting::new(
MockBufferSharedState::empty_with_n_shards(1.try_into().unwrap()),
None,
Arc::new(time),
)
.expect("failed to init mock write buffer");
let write_buffer_state = write_buffer.state();
let write_buffer: Arc<dyn WriteBufferWriting> = Arc::new(write_buffer);
let shards: BTreeSet<_> = write_buffer.shard_indexes();
let sharded_write_buffer = ShardedWriteBuffer::new(JumpHash::new(
shards
.into_iter()
.map(|shard_index| Shard::new(shard_index, Arc::clone(&write_buffer), &metrics))
.map(Arc::new),
));
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
let retention_validator =
RetentionValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let schema_validator =
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &metrics);
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});
let handler_stack = retention_validator
.and_then(schema_validator)
.and_then(partitioner)
.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(
sharded_write_buffer,
)));
let handler_stack = InstrumentationDecorator::new("request", &metrics, handler_stack);
let namespace_resolver =
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let namespace_resolver = NamespaceAutocreation::new(
namespace_resolver,
Arc::clone(&ns_cache),
Arc::clone(&catalog),
TopicId::new(TEST_TOPIC_ID),
QueryPoolId::new(TEST_QUERY_POOL_ID),
{
if autocreate_ns {
MissingNamespaceAction::AutoCreate(ns_autocreate_retention_period_ns)
} else {
MissingNamespaceAction::Reject
}
},
);
let delegate = HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics);
Self {
delegate,
catalog,
write_buffer_state,
metrics,
}
}
/// Get a reference to the test context's delegate.
pub fn delegate(&self) -> &HttpDelegateStack {
&self.delegate
}
/// Get a reference to the test context's catalog.
pub fn catalog(&self) -> Arc<dyn Catalog> {
Arc::clone(&self.catalog)
}
/// Get a reference to the test context's write buffer state.
pub fn write_buffer_state(&self) -> &Arc<MockBufferSharedState> {
&self.write_buffer_state
}
/// Get a reference to the test context's metrics.
pub fn metrics(&self) -> &Registry {
self.metrics.as_ref()
}
/// Return the [`TableId`] in the catalog for `name` in `namespace`, or panic.
pub async fn table_id(&self, namespace: &str, name: &str) -> TableId {
let mut repos = self.catalog.repositories().await;
let namespace_id = repos
.namespaces()
.get_by_name(namespace)
.await
.expect("query failed")
.expect("namespace does not exist")
.id;
repos
.tables()
.get_by_namespace_and_name(namespace_id, name)
.await
.expect("query failed")
.expect("no table entry for the specified namespace/table name pair")
.id
}
}
impl Default for TestContext {
fn default() -> Self {
Self::new(true, None)
}
}
pub mod common;
#[tokio::test]
async fn test_write_ok() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
// Write data inside retention period
let now = SystemProvider::default()
@ -207,19 +24,10 @@ async fn test_write_ok() {
.timestamp_nanos()
.to_string();
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.route(request)
.write_lp("bananas", "test", lp)
.await
.expect("LP write request failed");
.expect("write failed");
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// Check the write buffer observed the correct write.
@ -285,27 +93,20 @@ async fn test_write_ok() {
#[tokio::test]
async fn test_write_outside_retention_period() {
let ctx = TestContext::new(true, TEST_RETENTION_PERIOD_NS);
let ctx = TestContext::new(true, TEST_RETENTION_PERIOD_NS).await;
// Write data outside retention period into a new table
let two_hours_ago =
(SystemProvider::default().now().timestamp_nanos() - 2 * 3_600 * 1_000_000_000).to_string();
let lp = "apple,tag1=AAA,tag2=BBB val=422i ".to_string() + &two_hours_ago;
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let err = ctx
.delegate()
.route(request)
let response = ctx
.write_lp("bananas", "test", lp)
.await
.expect_err("LP write request should fail");
.expect_err("write should fail");
assert_matches!(
&err,
&response,
router::server::http::Error::DmlHandler(
DmlError::Retention(
RetentionError::OutsideRetention(e))
@ -313,12 +114,12 @@ async fn test_write_outside_retention_period() {
assert_eq!(e, "apple");
}
);
assert_eq!(err.as_status_code(), StatusCode::FORBIDDEN);
assert_eq!(response.as_status_code(), StatusCode::FORBIDDEN);
}
#[tokio::test]
async fn test_schema_conflict() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
// data inside the retention period
let now = SystemProvider::default()
@ -327,17 +128,10 @@ async fn test_schema_conflict() {
.to_string();
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.route(request)
.write_lp("bananas", "test", lp)
.await
.expect("LP write request failed");
.expect("write should succeed");
assert_eq!(response.status(), StatusCode::NO_CONTENT);
@ -347,17 +141,10 @@ async fn test_schema_conflict() {
.to_string();
let lp = "platanos,tag1=A,tag2=B val=42.0 ".to_string() + &now;
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let err = ctx
.delegate()
.route(request)
.write_lp("bananas", "test", lp)
.await
.expect_err("LP write request should fail");
.expect_err("write should fail");
assert_matches!(
&err,
@ -384,7 +171,7 @@ async fn test_schema_conflict() {
#[tokio::test]
async fn test_rejected_ns() {
let ctx = TestContext::new(false, None);
let ctx = TestContext::new(false, None).await;
let now = SystemProvider::default()
.now()
@ -392,17 +179,11 @@ async fn test_rejected_ns() {
.to_string();
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let err = ctx
.delegate()
.route(request)
.write_lp("bananas", "test", lp)
.await
.expect_err("should error");
.expect_err("write should fail");
assert_matches!(
err,
router::server::http::Error::NamespaceResolver(
@ -417,7 +198,7 @@ async fn test_rejected_ns() {
#[tokio::test]
async fn test_schema_limit() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
let now = SystemProvider::default()
.now()
@ -426,16 +207,10 @@ async fn test_schema_limit() {
let lp = "platanos,tag1=A,tag2=B val=42i ".to_string() + &now;
// Drive the creation of the namespace
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.route(request)
.write_lp("bananas", "test", lp)
.await
.expect("LP write request failed");
.expect("write should succeed");
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// Update the table limit
@ -454,16 +229,10 @@ async fn test_schema_limit() {
.to_string();
let lp = "platanos2,tag1=A,tag2=B val=42i ".to_string() + &now;
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let err = ctx
.delegate()
.route(request)
.write_lp("bananas", "test", lp)
.await
.expect_err("LP write request should fail");
.expect_err("write should fail");
assert_matches!(
&err,
@ -483,7 +252,7 @@ async fn test_schema_limit() {
#[tokio::test]
async fn test_write_propagate_ids() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
// Create the namespace and a set of tables.
let ns = ctx
@ -536,17 +305,10 @@ async fn test_write_propagate_ids() {
};
let request = Request::builder()
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
.method("POST")
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.route(request)
.write_lp("bananas", "test", lp)
.await
.expect("LP write request failed");
.expect("write should succeed");
assert_eq!(response.status(), StatusCode::NO_CONTENT);
@ -564,7 +326,7 @@ async fn test_write_propagate_ids() {
#[tokio::test]
async fn test_delete_propagate_ids() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
// Create the namespace and a set of tables.
let ns = ctx
@ -594,7 +356,7 @@ async fn test_delete_propagate_ids() {
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect("delete request failed");