From 22936abb2372802dd3e74f10d0e35f07735324f1 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 24 Nov 2021 18:22:31 +0100 Subject: [PATCH] refactor: remove connection manager and resolver from `Server` --- .../server_type/database/http.rs | 45 ++--- .../influxdb_ioxd/server_type/database/mod.rs | 27 +-- .../server_type/database/rpc/delete.rs | 21 +- .../server_type/database/rpc/deployment.rs | 22 +-- .../server_type/database/rpc/flight.rs | 16 +- .../server_type/database/rpc/management.rs | 24 +-- .../server_type/database/rpc/mod.rs | 11 +- .../server_type/database/rpc/write_pb.rs | 21 +- .../server_type/database/setup.rs | 13 +- server/src/connection.rs | 185 ------------------ server/src/lib.rs | 77 ++------ server/src/resolver.rs | 54 ----- server/tests/write_buffer_delete.rs | 3 +- 13 files changed, 77 insertions(+), 442 deletions(-) delete mode 100644 server/src/connection.rs delete mode 100644 server/src/resolver.rs diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs index 7fa70d09c1..163b4b6e17 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs @@ -14,7 +14,7 @@ use data_types::{names::OrgBucketMappingError, DatabaseName}; use influxdb_iox_client::format::QueryOutputFormat; use query::exec::ExecutionContextProvider; -use server::{connection::ConnectionManager, Error}; +use server::Error; // External crates use async_trait::async_trait; @@ -162,10 +162,7 @@ impl From for ApplicationError { } #[async_trait] -impl HttpDrivenDml for DatabaseServerType -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl HttpDrivenDml for DatabaseServerType { fn max_request_size(&self) -> usize { self.max_request_size } @@ -222,13 +219,10 @@ where } } -pub async fn route_request( - server_type: &DatabaseServerType, +pub async fn route_request( + server_type: &DatabaseServerType, req: Request, -) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +) -> Result, ApplicationError> { match server_type .route_dml_http_request(req) .await @@ -266,9 +260,9 @@ fn default_format() -> String { QueryOutputFormat::default().to_string() } -async fn query( +async fn query( req: Request, - server_type: &DatabaseServerType, + server_type: &DatabaseServerType, ) -> Result, ApplicationError> { let server = &server_type.server; @@ -340,10 +334,7 @@ mod tests { use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName}; use object_store::ObjectStore; use schema::selection::Selection; - use server::{ - connection::ConnectionManagerImpl, db::Db, rules::ProvidedDatabaseRules, ApplicationState, - Server, - }; + use server::{db::Db, rules::ProvidedDatabaseRules, ApplicationState, Server}; use trace::RingBufferTraceCollector; fn make_application() -> Arc { @@ -354,12 +345,8 @@ mod tests { )) } - fn make_server(application: Arc) -> Arc> { - Arc::new(Server::new( - ConnectionManagerImpl::new(), - application, - Default::default(), - )) + fn make_server(application: Arc) -> Arc { + Arc::new(Server::new(application, Default::default())) } #[tokio::test] @@ -377,10 +364,7 @@ mod tests { assert_tracing(setup_server().await).await; } - async fn assert_dbwrite( - test_server: TestServer>, - write: DmlWrite, - ) { + async fn assert_dbwrite(test_server: TestServer, write: DmlWrite) { let (table_name, mutable_batch) = write.tables().next().unwrap(); let test_db = test_server @@ -529,10 +513,7 @@ mod tests { /// Sets up a test database with some data for testing the query endpoint /// returns a client for communicating with the server, and the server /// endpoint - async fn setup_test_data() -> ( - Client, - TestServer>, - ) { + async fn setup_test_data() -> (Client, TestServer) { let test_server = setup_server().await; let client = Client::new(); @@ -689,7 +670,7 @@ mod tests { } /// return a test server and the url to contact it for `MyOrg_MyBucket` - async fn setup_server() -> TestServer> { + async fn setup_server() -> TestServer { let application = make_application(); let app_server = make_server(Arc::clone(&application)); diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs index ed3daa4e7a..4e2577e0b7 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs @@ -5,7 +5,7 @@ use futures::{future::FusedFuture, FutureExt}; use hyper::{Body, Request, Response}; use metric::Registry; use observability_deps::tracing::{error, info}; -use server::{connection::ConnectionManager, ApplicationState, Server}; +use server::{ApplicationState, Server}; use tokio_util::sync::CancellationToken; use trace::TraceCollector; @@ -25,25 +25,19 @@ pub use self::http::ApplicationError; use super::common_state::CommonServerState; #[derive(Debug)] -pub struct DatabaseServerType -where - M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static, -{ +pub struct DatabaseServerType { pub application: Arc, - pub server: Arc>, + pub server: Arc, pub lp_metrics: Arc, pub max_request_size: usize, pub serving_readiness: ServingReadiness, shutdown: CancellationToken, } -impl DatabaseServerType -where - M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static, -{ +impl DatabaseServerType { pub fn new( application: Arc, - server: Arc>, + server: Arc, common_state: &CommonServerState, ) -> Self { let lp_metrics = Arc::new(LineProtocolMetrics::new( @@ -62,10 +56,7 @@ where } #[async_trait] -impl ServerType for DatabaseServerType -where - M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static, -{ +impl ServerType for DatabaseServerType { type RouteError = ApplicationError; fn metric_registry(&self) -> Arc { @@ -132,7 +123,7 @@ mod tests { use data_types::{database_rules::DatabaseRules, DatabaseName}; use futures::pin_mut; use influxdb_iox_client::{connection::Connection, flight::PerformQuery}; - use server::{connection::ConnectionManagerImpl, rules::ProvidedDatabaseRules}; + use server::rules::ProvidedDatabaseRules; use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64}; use structopt::StructOpt; use tokio::task::JoinHandle; @@ -157,7 +148,7 @@ mod tests { async fn test_serve( config: RunConfig, application: Arc, - server: Arc>, + server: Arc, ) { let grpc_listener = grpc_listener(config.grpc_bind_address.into()) .await @@ -348,7 +339,7 @@ mod tests { collector: &Arc, ) -> ( SocketAddr, - Arc>, + Arc, JoinHandle>, ) { let config = test_config(Some(23)); diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs index 90b8005b57..85133af32e 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs @@ -1,4 +1,3 @@ -use std::fmt::Debug; use std::sync::Arc; use data_types::non_empty::NonEmptyString; @@ -6,20 +5,17 @@ use data_types::DatabaseName; use dml::{DmlDelete, DmlMeta}; use generated_types::google::{FieldViolationExt, FromOptionalField, OptionalField}; use generated_types::influxdata::iox::delete::v1::*; -use server::{connection::ConnectionManager, Server}; +use server::Server; use tonic::Response; -struct DeleteService { - server: Arc>, +struct DeleteService { + server: Arc, } use super::error::{default_db_error_handler, default_server_error_handler}; #[tonic::async_trait] -impl delete_service_server::DeleteService for DeleteService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl delete_service_server::DeleteService for DeleteService { async fn delete( &self, request: tonic::Request, @@ -50,11 +46,8 @@ where } } -pub fn make_server( - server: Arc>, -) -> delete_service_server::DeleteServiceServer -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +pub fn make_server( + server: Arc, +) -> delete_service_server::DeleteServiceServer { delete_service_server::DeleteServiceServer::new(DeleteService { server }) } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/deployment.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/deployment.rs index 92e180e1ec..08780dcfe2 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/deployment.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/deployment.rs @@ -3,12 +3,12 @@ use generated_types::{ google::{FieldViolation, NotFound}, influxdata::iox::deployment::v1::*, }; -use server::{connection::ConnectionManager, Error, Server}; -use std::{convert::TryFrom, fmt::Debug, sync::Arc}; +use server::{Error, Server}; +use std::{convert::TryFrom, sync::Arc}; use tonic::{Request, Response, Status}; -struct DeploymentService { - server: Arc>, +struct DeploymentService { + server: Arc, serving_readiness: ServingReadiness, } @@ -16,10 +16,7 @@ use super::error::default_server_error_handler; use crate::influxdb_ioxd::serving_readiness::ServingReadiness; #[tonic::async_trait] -impl deployment_service_server::DeploymentService for DeploymentService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl deployment_service_server::DeploymentService for DeploymentService { async fn get_server_id( &self, _: Request, @@ -69,15 +66,12 @@ where } } -pub fn make_server( - server: Arc>, +pub fn make_server( + server: Arc, serving_readiness: ServingReadiness, ) -> deployment_service_server::DeploymentServiceServer< impl deployment_service_server::DeploymentService, -> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +> { deployment_service_server::DeploymentServiceServer::new(DeploymentService { server, serving_readiness, diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs index f8151b11ee..08ea5a3ec4 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs @@ -25,7 +25,7 @@ use tonic::{Request, Response, Streaming}; use data_types::{DatabaseName, DatabaseNameError}; use observability_deps::tracing::{info, warn}; use query::exec::{ExecutionContextProvider, IOxExecutionContext}; -use server::{connection::ConnectionManager, Server}; +use server::Server; use super::error::default_server_error_handler; use crate::influxdb_ioxd::planner::Planner; @@ -125,22 +125,16 @@ struct ReadInfo { /// Concrete implementation of the gRPC Arrow Flight Service API #[derive(Debug)] -struct FlightService { - server: Arc>, +struct FlightService { + server: Arc, } -pub fn make_server(server: Arc>) -> FlightServer -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +pub fn make_server(server: Arc) -> FlightServer { FlightServer::new(FlightService { server }) } #[tonic::async_trait] -impl Flight for FlightService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl Flight for FlightService { type HandshakeStream = TonicStream; type ListFlightsStream = TonicStream; type DoGetStream = TonicStream; diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs index a3f6c5e3b4..eb65507578 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs @@ -4,16 +4,14 @@ use generated_types::{ influxdata::iox::management::v1::{Error as ProtobufError, *}, }; use query::QueryDatabase; -use server::{ - connection::ConnectionManager, rules::ProvidedDatabaseRules, ApplicationState, Error, Server, -}; -use std::{convert::TryFrom, fmt::Debug, sync::Arc}; +use server::{rules::ProvidedDatabaseRules, ApplicationState, Error, Server}; +use std::{convert::TryFrom, sync::Arc}; use tonic::{Request, Response, Status}; use uuid::Uuid; -struct ManagementService { +struct ManagementService { application: Arc, - server: Arc>, + server: Arc, } use super::error::{ @@ -21,10 +19,7 @@ use super::error::{ }; #[tonic::async_trait] -impl management_service_server::ManagementService for ManagementService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl management_service_server::ManagementService for ManagementService { async fn list_databases( &self, request: Request, @@ -523,15 +518,12 @@ fn format_rules(provided_rules: Arc, omit_defaults: bool) } } -pub fn make_server( +pub fn make_server( application: Arc, - server: Arc>, + server: Arc, ) -> management_service_server::ManagementServiceServer< impl management_service_server::ManagementService, -> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +> { management_service_server::ManagementServiceServer::new(ManagementService { application, server, diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs index c2322b0703..9ecbc86646 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use server::connection::ConnectionManager; - use crate::influxdb_ioxd::{ rpc::{add_gated_service, add_service, serve_builder, setup_builder, RpcBuilderInput}, server_type::{database::DatabaseServerType, RpcError}, @@ -16,13 +14,10 @@ mod operations; mod storage; mod write_pb; -pub async fn server_grpc( - server_type: Arc>, +pub async fn server_grpc( + server_type: Arc, builder_input: RpcBuilderInput, -) -> Result<(), RpcError> -where - M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static, -{ +) -> Result<(), RpcError> { let builder = setup_builder!(builder_input, server_type); add_gated_service!( diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs index d285b65b0e..69f861a959 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs @@ -2,21 +2,17 @@ use data_types::DatabaseName; use dml::{DmlMeta, DmlOperation, DmlWrite}; use generated_types::google::{FieldViolation, FieldViolationExt}; use generated_types::influxdata::pbdata::v1::*; -use server::{connection::ConnectionManager, Server}; -use std::fmt::Debug; +use server::Server; use std::sync::Arc; use super::error::{default_database_write_error_handler, default_server_error_handler}; -struct PBWriteService { - server: Arc>, +struct PBWriteService { + server: Arc, } #[tonic::async_trait] -impl write_service_server::WriteService for PBWriteService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl write_service_server::WriteService for PBWriteService { async fn write( &self, request: tonic::Request, @@ -53,11 +49,8 @@ where } } -pub fn make_server( - server: Arc>, -) -> write_service_server::WriteServiceServer -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +pub fn make_server( + server: Arc, +) -> write_service_server::WriteServiceServer { write_service_server::WriteServiceServer::new(PBWriteService { server }) } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs index c38ccef35e..7a4dc8d4c4 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs @@ -2,9 +2,7 @@ use std::sync::Arc; use object_store::ObjectStore; use observability_deps::tracing::warn; -use server::{ - connection::ConnectionManagerImpl, ApplicationState, RemoteTemplate, Server, ServerConfig, -}; +use server::{ApplicationState, Server, ServerConfig}; use snafu::{ResultExt, Snafu}; use trace::TraceCollector; @@ -47,18 +45,13 @@ pub async fn make_application( ))) } -pub fn make_server( - application: Arc, - config: &Config, -) -> Arc> { +pub fn make_server(application: Arc, config: &Config) -> Arc { let server_config = ServerConfig { - remote_template: config.remote_template.clone().map(RemoteTemplate::new), wipe_catalog_on_error: config.wipe_catalog_on_error.into(), skip_replay_and_seek_instead: config.skip_replay_and_seek_instead.into(), }; - let connection_manager = ConnectionManagerImpl::new(); - let app_server = Arc::new(Server::new(connection_manager, application, server_config)); + let app_server = Arc::new(Server::new(application, server_config)); // if this ID isn't set the server won't be usable until this is set via an API // call diff --git a/server/src/connection.rs b/server/src/connection.rs deleted file mode 100644 index c5d0d0030e..0000000000 --- a/server/src/connection.rs +++ /dev/null @@ -1,185 +0,0 @@ -use std::sync::Arc; - -use async_trait::async_trait; -use cache_loader_async::cache_api::LoadingCache; -use snafu::{ResultExt, Snafu}; - -use dml::DmlWrite; -use generated_types::influxdata::pbdata::v1::WriteRequest; -use influxdb_iox_client::{connection::Builder, write}; -use observability_deps::tracing::debug; - -type RemoteServerError = Box; - -#[derive(Debug, Snafu)] -pub enum ConnectionManagerError { - #[snafu(display("cannot connect to remote: {}", source))] - RemoteServerConnectError { source: RemoteServerError }, - #[snafu(display("cannot write to remote: {}", source))] - RemoteServerWriteError { source: write::WriteError }, -} - -/// The `Server` will ask the `ConnectionManager` for connections to a specific -/// remote server. These connections can be used to communicate with other -/// servers. This is implemented as a trait for dependency injection in testing. -#[async_trait] -pub trait ConnectionManager { - type RemoteServer: RemoteServer + Send + Sync + 'static; - - async fn remote_server( - &self, - connect: &str, - ) -> Result, ConnectionManagerError>; -} - -/// The `RemoteServer` represents the API for replicating, subscribing, and -/// querying other servers. -#[async_trait] -pub trait RemoteServer { - /// Sends a [`DmlWrite`] to the remote server. An IOx server acting as a - /// router/sharder will call this method to send entries to remotes. - async fn write(&self, db: &str, write: &DmlWrite) -> Result<(), ConnectionManagerError>; -} - -/// The connection manager maps a host identifier to a remote server. -#[derive(Debug)] -pub struct ConnectionManagerImpl { - cache: LoadingCache, CacheFillError>, -} - -// Error must be Clone because LoadingCache requires so. -#[derive(Debug, Snafu, Clone)] -pub enum CacheFillError { - #[snafu(display("gRPC error: {}", source))] - GrpcError { - source: Arc, - }, -} - -impl ConnectionManagerImpl { - pub fn new() -> Self { - let cache = LoadingCache::new(Self::cached_remote_server); - Self { cache } - } - - async fn cached_remote_server( - connect: String, - ) -> Result, CacheFillError> { - let connection = Builder::default() - .build(&connect) - .await - .map_err(|e| Arc::new(e) as _) - .context(GrpcError)?; - let client = write::Client::new(connection); - Ok(Arc::new(RemoteServerImpl { client })) - } -} - -impl Default for ConnectionManagerImpl { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl ConnectionManager for ConnectionManagerImpl { - type RemoteServer = RemoteServerImpl; - - async fn remote_server( - &self, - connect: &str, - ) -> Result, ConnectionManagerError> { - let ret = self - .cache - .get_with_meta(connect.to_string()) - .await - .map_err(|e| Box::new(e) as _) - .context(RemoteServerConnectError)?; - debug!(was_cached=%ret.cached, %connect, "getting remote connection"); - Ok(ret.result) - } -} - -/// An implementation for communicating with other IOx servers. This should -/// be moved into and implemented in an influxdb_iox_client create at a later -/// date. -#[derive(Debug)] -pub struct RemoteServerImpl { - client: write::Client, -} - -#[async_trait] -impl RemoteServer for RemoteServerImpl { - /// Sends a write to the remote server. An IOx server acting as a - /// router/sharder will call this method to send entries to remotes. - async fn write(&self, db_name: &str, write: &DmlWrite) -> Result<(), ConnectionManagerError> { - let data = mutable_batch_pb::encode::encode_write(db_name, write); - self.client - .clone() // cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage - .write_pb(WriteRequest { - database_batch: Some(data), - }) - .await - .context(RemoteServerWriteError) - } -} - -pub mod test_helpers { - use std::sync::atomic::{AtomicBool, Ordering}; - - use super::*; - use std::collections::BTreeMap; - - #[derive(Debug)] - pub struct TestConnectionManager { - pub remotes: BTreeMap>, - } - - impl Default for TestConnectionManager { - fn default() -> Self { - Self::new() - } - } - - impl TestConnectionManager { - pub fn new() -> Self { - Self { - remotes: BTreeMap::new(), - } - } - } - - #[async_trait] - impl ConnectionManager for TestConnectionManager { - type RemoteServer = TestRemoteServer; - - async fn remote_server( - &self, - id: &str, - ) -> Result, ConnectionManagerError> { - #[derive(Debug, Snafu)] - enum TestRemoteError { - #[snafu(display("remote not found"))] - NotFound, - } - Ok(Arc::clone(self.remotes.get(id).ok_or_else(|| { - ConnectionManagerError::RemoteServerConnectError { - source: Box::new(TestRemoteError::NotFound), - } - })?)) - } - } - - #[derive(Debug)] - pub struct TestRemoteServer { - pub written: Arc, - } - - #[async_trait] - impl<'a> RemoteServer for TestRemoteServer { - async fn write(&self, _db: &str, _write: &DmlWrite) -> Result<(), ConnectionManagerError> { - self.written.store(true, Ordering::Relaxed); - Ok(()) - } - } -} diff --git a/server/src/lib.rs b/server/src/lib.rs index f544543803..414a2aabe4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -70,7 +70,6 @@ use ::lifecycle::{LockableChunk, LockablePartition}; use async_trait::async_trait; -use connection::ConnectionManager; use data_types::{ chunk_metadata::ChunkId, detailed_database::ActiveDatabase, @@ -87,7 +86,6 @@ use internal_types::freezable::Freezable; use iox_object_store::IoxObjectStore; use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; -use resolver::Resolver; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::sync::Arc; use tokio::{sync::Notify, task::JoinError}; @@ -98,14 +96,11 @@ use uuid::Uuid; pub use application::ApplicationState; pub use db::Db; pub use job::JobRegistry; -pub use resolver::{GrpcConnectionString, RemoteTemplate}; mod application; -pub mod connection; pub mod database; pub mod db; mod job; -mod resolver; pub mod rules; use rules::{PersistedDatabaseRules, ProvidedDatabaseRules}; @@ -233,8 +228,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync { /// Configuration options for `Server` #[derive(Debug)] pub struct ServerConfig { - pub remote_template: Option, - pub wipe_catalog_on_error: bool, pub skip_replay_and_seek_instead: bool, @@ -243,7 +236,6 @@ pub struct ServerConfig { impl Default for ServerConfig { fn default() -> Self { Self { - remote_template: None, wipe_catalog_on_error: false, skip_replay_and_seek_instead: false, } @@ -254,20 +246,15 @@ impl Default for ServerConfig { /// well as how they communicate with other servers. Each server will have one /// of these structs, which keeps track of all replication and query rules. #[derive(Debug)] -pub struct Server { - connection_manager: Arc, - +pub struct Server { /// Future that resolves when the background worker exits join: Shared>>>, - /// Resolver for mapping ServerId to gRPC connection strings - resolver: RwLock, - /// State shared with the background worker shared: Arc, } -impl Drop for Server { +impl Drop for Server { fn drop(&mut self) { if !self.shared.shutdown.is_cancelled() { warn!("server dropped without calling shutdown()"); @@ -466,15 +453,8 @@ impl ServerStateInitialized { } } -impl Server -where - M: ConnectionManager + Send + Sync, -{ - pub fn new( - connection_manager: M, - application: Arc, - config: ServerConfig, - ) -> Self { +impl Server { + pub fn new(application: Arc, config: ServerConfig) -> Self { let shared = Arc::new(ServerShared { shutdown: Default::default(), application, @@ -488,12 +468,7 @@ where let handle = tokio::spawn(background_worker(Arc::clone(&shared))); let join = handle.map_err(Arc::new).boxed().shared(); - Self { - shared, - join, - connection_manager: Arc::new(connection_manager), - resolver: RwLock::new(Resolver::new(config.remote_template)), - } + Self { shared, join } } /// sets the id of the server, which is used for replication and the base @@ -860,18 +835,6 @@ where .context(CanNotUpdateRules { db_name })?) } - pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> { - self.resolver.read().remotes_sorted() - } - - pub fn update_remote(&self, id: ServerId, addr: GrpcConnectionString) { - self.resolver.write().update_remote(id, addr) - } - - pub fn delete_remote(&self, id: ServerId) -> Option { - self.resolver.write().delete_remote(id) - } - /// Closes a chunk and starts moving its data to the read buffer, as a /// background job, dropping when complete. pub fn close_chunk( @@ -1115,10 +1078,7 @@ async fn maybe_initialize_server(shared: &ServerShared) { /// TODO: Revisit this trait's API #[async_trait] -impl DatabaseStore for Server -where - M: ConnectionManager + std::fmt::Debug + Send + Sync, -{ +impl DatabaseStore for Server { type Database = Db; type Error = Error; @@ -1148,10 +1108,7 @@ where } #[cfg(test)] -impl Server -where - M: ConnectionManager + Send + Sync, -{ +impl Server { /// For tests: list of database names in this server, regardless /// of their initialization state fn db_names_sorted(&self) -> Vec { @@ -1218,7 +1175,6 @@ async fn database_name_from_rules_file( pub mod test_utils { use super::*; - use crate::connection::test_helpers::TestConnectionManager; use object_store::ObjectStore; /// Create a new [`ApplicationState`] with an in-memory object store @@ -1231,12 +1187,8 @@ pub mod test_utils { } /// Creates a new server with the provided [`ApplicationState`] - pub fn make_server(application: Arc) -> Arc> { - Arc::new(Server::new( - TestConnectionManager::new(), - application, - Default::default(), - )) + pub fn make_server(application: Arc) -> Arc { + Arc::new(Server::new(application, Default::default())) } /// Creates a new server with the provided [`ApplicationState`] @@ -1245,7 +1197,7 @@ pub mod test_utils { pub async fn make_initialized_server( server_id: ServerId, application: Arc, - ) -> Arc> { + ) -> Arc { let server = make_server(application); server.set_id(server_id).unwrap(); server.wait_for_init().await.unwrap(); @@ -1457,13 +1409,10 @@ mod tests { } } - async fn create_simple_database( - server: &Server, + async fn create_simple_database( + server: &Server, name: impl Into + Send, - ) -> Result> - where - M: ConnectionManager + Send + Sync, - { + ) -> Result> { let name = DatabaseName::new(name.into()).unwrap(); let rules = DatabaseRules { diff --git a/server/src/resolver.rs b/server/src/resolver.rs deleted file mode 100644 index e77ea9bf41..0000000000 --- a/server/src/resolver.rs +++ /dev/null @@ -1,54 +0,0 @@ -use data_types::server_id::ServerId; -use std::collections::BTreeMap; - -/// A RemoteTemplate string is a remote connection template string. -/// Occurrences of the substring "{id}" in the template will be replaced -/// by the server ID. -#[derive(Debug)] -pub struct RemoteTemplate { - template: String, -} - -impl RemoteTemplate { - pub fn new(template: impl Into) -> Self { - let template = template.into(); - Self { template } - } -} - -/// A gRPC connection string. -pub type GrpcConnectionString = String; - -/// The Resolver provides a mapping between ServerId and GRpcConnectionString -#[derive(Debug)] -pub struct Resolver { - /// Map between remote IOx server IDs and management API connection strings. - remotes: BTreeMap, - - /// Static map between remote server IDs and hostnames based on a template - remote_template: Option, -} - -impl Resolver { - pub fn new(remote_template: Option) -> Self { - Self { - remotes: Default::default(), - remote_template, - } - } - - /// Get all registered remote servers. - pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> { - self.remotes.iter().map(|(&a, b)| (a, b.clone())).collect() - } - - /// Update given remote server. - pub fn update_remote(&mut self, id: ServerId, addr: GrpcConnectionString) { - self.remotes.insert(id, addr); - } - - /// Delete remote server by ID. - pub fn delete_remote(&mut self, id: ServerId) -> Option { - self.remotes.remove(&id) - } -} diff --git a/server/tests/write_buffer_delete.rs b/server/tests/write_buffer_delete.rs index c09be30957..d4ccd198e4 100644 --- a/server/tests/write_buffer_delete.rs +++ b/server/tests/write_buffer_delete.rs @@ -23,7 +23,6 @@ use query::frontend::sql::SqlQueryPlanner; use regex::Regex; use router::router::Router; use router::server::RouterServer; -use server::connection::test_helpers::TestConnectionManager; use server::rules::ProvidedDatabaseRules; use server::test_utils::{make_application, make_initialized_server}; use server::{Db, Server}; @@ -44,7 +43,7 @@ use write_buffer::mock::MockBufferSharedState; struct DistributedTest { router: Arc, - consumer: Arc>, + consumer: Arc, consumer_db: Arc, }