test(router): initialise gRPC delegate in e2e

Initialise the "rpc mode" gRPC handlers in the router e2e TestContext.
pull/24376/head
Dom Dwyer 2023-01-26 15:41:14 +01:00
parent 3efc42baac
commit 6f1869f9dc
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
2 changed files with 43 additions and 30 deletions

View File

@ -1,3 +1,5 @@
#![allow(unused)]
use std::{collections::BTreeSet, iter, string::String, sync::Arc};
use data_types::{PartitionTemplate, QueryPoolId, TableId, TemplatePart, TopicId};
@ -5,6 +7,7 @@ use hashbrown::HashMap;
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use metric::Registry;
use mutable_batch::MutableBatch;
use object_store::memory::InMemory;
use router::{
dml_handlers::{
Chain, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioned,
@ -12,7 +15,7 @@ use router::{
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
namespace_resolver::{MissingNamespaceAction, NamespaceAutocreation, NamespaceSchemaResolver},
server::http::HttpDelegate,
server::{grpc::RpcWriteGrpcDelegate, http::HttpDelegate},
shard::Shard,
};
use sharder::JumpHash;
@ -32,8 +35,10 @@ pub const TEST_QUERY_POOL_ID: i64 = 1;
/// Common retention period value we'll use in tests
pub const TEST_RETENTION_PERIOD_NS: Option<i64> = Some(3_600 * 1_000_000_000);
#[derive(Debug)]
pub struct TestContext {
delegate: HttpDelegateStack,
http_delegate: HttpDelegateStack,
grpc_delegate: RpcWriteGrpcDelegate,
catalog: Arc<dyn Catalog>,
write_buffer_state: Arc<MockBufferSharedState>,
metrics: Arc<Registry>,
@ -70,7 +75,7 @@ type HttpDelegateStack = HttpDelegate<
/// A [`router`] stack configured with the various DML handlers using mock
/// catalog / write buffer backends.
impl TestContext {
pub fn new(autocreate_ns: bool, ns_autocreate_retention_period_ns: Option<i64>) -> Self {
pub async fn new(autocreate_ns: bool, ns_autocreate_retention_period_ns: Option<i64>) -> Self {
let metrics = Arc::new(metric::Registry::default());
let time = iox_time::MockProvider::new(
iox_time::Time::from_timestamp_millis(668563200000).unwrap(),
@ -132,19 +137,33 @@ impl TestContext {
},
);
let delegate = HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics);
let http_delegate =
HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics);
let grpc_delegate = RpcWriteGrpcDelegate::new(
Arc::clone(&catalog),
Arc::new(InMemory::default()),
TopicId::new(TEST_TOPIC_ID),
QueryPoolId::new(TEST_QUERY_POOL_ID),
);
Self {
delegate,
http_delegate,
grpc_delegate,
catalog,
write_buffer_state,
metrics,
}
}
/// Get a reference to the test context's delegate.
pub fn delegate(&self) -> &HttpDelegateStack {
&self.delegate
/// Get a reference to the test context's http delegate.
pub fn http_delegate(&self) -> &HttpDelegateStack {
&self.http_delegate
}
/// Get a reference to the test context's grpc delegate.
pub fn grpc_delegate(&self) -> &RpcWriteGrpcDelegate {
&self.grpc_delegate
}
/// Get a reference to the test context's catalog.
@ -182,9 +201,3 @@ impl TestContext {
.id
}
}
impl Default for TestContext {
fn default() -> Self {
Self::new(true, None)
}
}

View File

@ -16,7 +16,7 @@ pub mod common;
#[tokio::test]
async fn test_write_ok() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
// Write data inside retention period
let now = SystemProvider::default()
@ -32,7 +32,7 @@ async fn test_write_ok() {
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect("LP write request failed");
@ -102,7 +102,7 @@ async fn test_write_ok() {
#[tokio::test]
async fn test_write_outside_retention_period() {
let ctx = TestContext::new(true, TEST_RETENTION_PERIOD_NS);
let ctx = TestContext::new(true, TEST_RETENTION_PERIOD_NS).await;
// Write data outside retention period into a new table
let two_hours_ago =
@ -116,7 +116,7 @@ async fn test_write_outside_retention_period() {
.expect("failed to construct HTTP request");
let err = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect_err("LP write request should fail");
@ -135,7 +135,7 @@ async fn test_write_outside_retention_period() {
#[tokio::test]
async fn test_schema_conflict() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
// data inside the retention period
let now = SystemProvider::default()
@ -151,7 +151,7 @@ async fn test_schema_conflict() {
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect("LP write request failed");
@ -171,7 +171,7 @@ async fn test_schema_conflict() {
.expect("failed to construct HTTP request");
let err = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect_err("LP write request should fail");
@ -201,7 +201,7 @@ async fn test_schema_conflict() {
#[tokio::test]
async fn test_rejected_ns() {
let ctx = TestContext::new(false, None);
let ctx = TestContext::new(false, None).await;
let now = SystemProvider::default()
.now()
@ -216,7 +216,7 @@ async fn test_rejected_ns() {
.expect("failed to construct HTTP request");
let err = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect_err("should error");
@ -234,7 +234,7 @@ async fn test_rejected_ns() {
#[tokio::test]
async fn test_schema_limit() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
let now = SystemProvider::default()
.now()
@ -249,7 +249,7 @@ async fn test_schema_limit() {
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect("LP write request failed");
@ -277,7 +277,7 @@ async fn test_schema_limit() {
.body(Body::from(lp))
.expect("failed to construct HTTP request");
let err = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect_err("LP write request should fail");
@ -300,7 +300,7 @@ async fn test_schema_limit() {
#[tokio::test]
async fn test_write_propagate_ids() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
// Create the namespace and a set of tables.
let ns = ctx
@ -360,7 +360,7 @@ async fn test_write_propagate_ids() {
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect("LP write request failed");
@ -381,7 +381,7 @@ async fn test_write_propagate_ids() {
#[tokio::test]
async fn test_delete_propagate_ids() {
let ctx = TestContext::new(true, None);
let ctx = TestContext::new(true, None).await;
// Create the namespace and a set of tables.
let ns = ctx
@ -411,7 +411,7 @@ async fn test_delete_propagate_ids() {
.expect("failed to construct HTTP request");
let response = ctx
.delegate()
.http_delegate()
.route(request)
.await
.expect("delete request failed");