From 6f1869f9dcd10890897258e75bf456992e2a1a1e Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 26 Jan 2023 15:41:14 +0100 Subject: [PATCH] test(router): initialise gRPC delegate in e2e Initialise the "rpc mode" gRPC handlers in the router e2e TestContext. --- router/tests/common/mod.rs | 41 +++++++++++++++++++++++++------------- router/tests/http.rs | 32 ++++++++++++++--------------- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/router/tests/common/mod.rs b/router/tests/common/mod.rs index 595ef8b39b..778045b69a 100644 --- a/router/tests/common/mod.rs +++ b/router/tests/common/mod.rs @@ -1,3 +1,5 @@ +#![allow(unused)] + use std::{collections::BTreeSet, iter, string::String, sync::Arc}; use data_types::{PartitionTemplate, QueryPoolId, TableId, TemplatePart, TopicId}; @@ -5,6 +7,7 @@ use hashbrown::HashMap; use iox_catalog::{interface::Catalog, mem::MemCatalog}; use metric::Registry; use mutable_batch::MutableBatch; +use object_store::memory::InMemory; use router::{ dml_handlers::{ Chain, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioned, @@ -12,7 +15,7 @@ use router::{ }, namespace_cache::{MemoryNamespaceCache, ShardedCache}, namespace_resolver::{MissingNamespaceAction, NamespaceAutocreation, NamespaceSchemaResolver}, - server::http::HttpDelegate, + server::{grpc::RpcWriteGrpcDelegate, http::HttpDelegate}, shard::Shard, }; use sharder::JumpHash; @@ -32,8 +35,10 @@ pub const TEST_QUERY_POOL_ID: i64 = 1; /// Common retention period value we'll use in tests pub const TEST_RETENTION_PERIOD_NS: Option = Some(3_600 * 1_000_000_000); +#[derive(Debug)] pub struct TestContext { - delegate: HttpDelegateStack, + http_delegate: HttpDelegateStack, + grpc_delegate: RpcWriteGrpcDelegate, catalog: Arc, write_buffer_state: Arc, metrics: Arc, @@ -70,7 +75,7 @@ type HttpDelegateStack = HttpDelegate< /// A [`router`] stack configured with the various DML handlers using mock /// catalog / write buffer backends. impl TestContext { - pub fn new(autocreate_ns: bool, ns_autocreate_retention_period_ns: Option) -> Self { + pub async fn new(autocreate_ns: bool, ns_autocreate_retention_period_ns: Option) -> Self { let metrics = Arc::new(metric::Registry::default()); let time = iox_time::MockProvider::new( iox_time::Time::from_timestamp_millis(668563200000).unwrap(), @@ -132,19 +137,33 @@ impl TestContext { }, ); - let delegate = HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics); + let http_delegate = + HttpDelegate::new(1024, 100, namespace_resolver, handler_stack, &metrics); + + let grpc_delegate = RpcWriteGrpcDelegate::new( + Arc::clone(&catalog), + Arc::new(InMemory::default()), + TopicId::new(TEST_TOPIC_ID), + QueryPoolId::new(TEST_QUERY_POOL_ID), + ); Self { - delegate, + http_delegate, + grpc_delegate, catalog, write_buffer_state, metrics, } } - /// Get a reference to the test context's delegate. - pub fn delegate(&self) -> &HttpDelegateStack { - &self.delegate + /// Get a reference to the test context's http delegate. + pub fn http_delegate(&self) -> &HttpDelegateStack { + &self.http_delegate + } + + /// Get a reference to the test context's grpc delegate. + pub fn grpc_delegate(&self) -> &RpcWriteGrpcDelegate { + &self.grpc_delegate } /// Get a reference to the test context's catalog. @@ -182,9 +201,3 @@ impl TestContext { .id } } - -impl Default for TestContext { - fn default() -> Self { - Self::new(true, None) - } -} diff --git a/router/tests/http.rs b/router/tests/http.rs index 281836ec68..c8d7d42137 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -16,7 +16,7 @@ pub mod common; #[tokio::test] async fn test_write_ok() { - let ctx = TestContext::new(true, None); + let ctx = TestContext::new(true, None).await; // Write data inside retention period let now = SystemProvider::default() @@ -32,7 +32,7 @@ async fn test_write_ok() { .expect("failed to construct HTTP request"); let response = ctx - .delegate() + .http_delegate() .route(request) .await .expect("LP write request failed"); @@ -102,7 +102,7 @@ async fn test_write_ok() { #[tokio::test] async fn test_write_outside_retention_period() { - let ctx = TestContext::new(true, TEST_RETENTION_PERIOD_NS); + let ctx = TestContext::new(true, TEST_RETENTION_PERIOD_NS).await; // Write data outside retention period into a new table let two_hours_ago = @@ -116,7 +116,7 @@ async fn test_write_outside_retention_period() { .expect("failed to construct HTTP request"); let err = ctx - .delegate() + .http_delegate() .route(request) .await .expect_err("LP write request should fail"); @@ -135,7 +135,7 @@ async fn test_write_outside_retention_period() { #[tokio::test] async fn test_schema_conflict() { - let ctx = TestContext::new(true, None); + let ctx = TestContext::new(true, None).await; // data inside the retention period let now = SystemProvider::default() @@ -151,7 +151,7 @@ async fn test_schema_conflict() { .expect("failed to construct HTTP request"); let response = ctx - .delegate() + .http_delegate() .route(request) .await .expect("LP write request failed"); @@ -171,7 +171,7 @@ async fn test_schema_conflict() { .expect("failed to construct HTTP request"); let err = ctx - .delegate() + .http_delegate() .route(request) .await .expect_err("LP write request should fail"); @@ -201,7 +201,7 @@ async fn test_schema_conflict() { #[tokio::test] async fn test_rejected_ns() { - let ctx = TestContext::new(false, None); + let ctx = TestContext::new(false, None).await; let now = SystemProvider::default() .now() @@ -216,7 +216,7 @@ async fn test_rejected_ns() { .expect("failed to construct HTTP request"); let err = ctx - .delegate() + .http_delegate() .route(request) .await .expect_err("should error"); @@ -234,7 +234,7 @@ async fn test_rejected_ns() { #[tokio::test] async fn test_schema_limit() { - let ctx = TestContext::new(true, None); + let ctx = TestContext::new(true, None).await; let now = SystemProvider::default() .now() @@ -249,7 +249,7 @@ async fn test_schema_limit() { .body(Body::from(lp)) .expect("failed to construct HTTP request"); let response = ctx - .delegate() + .http_delegate() .route(request) .await .expect("LP write request failed"); @@ -277,7 +277,7 @@ async fn test_schema_limit() { .body(Body::from(lp)) .expect("failed to construct HTTP request"); let err = ctx - .delegate() + .http_delegate() .route(request) .await .expect_err("LP write request should fail"); @@ -300,7 +300,7 @@ async fn test_schema_limit() { #[tokio::test] async fn test_write_propagate_ids() { - let ctx = TestContext::new(true, None); + let ctx = TestContext::new(true, None).await; // Create the namespace and a set of tables. let ns = ctx @@ -360,7 +360,7 @@ async fn test_write_propagate_ids() { .expect("failed to construct HTTP request"); let response = ctx - .delegate() + .http_delegate() .route(request) .await .expect("LP write request failed"); @@ -381,7 +381,7 @@ async fn test_write_propagate_ids() { #[tokio::test] async fn test_delete_propagate_ids() { - let ctx = TestContext::new(true, None); + let ctx = TestContext::new(true, None).await; // Create the namespace and a set of tables. let ns = ctx @@ -411,7 +411,7 @@ async fn test_delete_propagate_ids() { .expect("failed to construct HTTP request"); let response = ctx - .delegate() + .http_delegate() .route(request) .await .expect("delete request failed");