Merge pull request #3218 from influxdata/crepererum/remove_routing_from_database_mode_4
refactor: remove connection manager and resolver from `Server`pull/24376/head
commit
cfba3651d7
|
@ -14,7 +14,7 @@
|
|||
use data_types::{names::OrgBucketMappingError, DatabaseName};
|
||||
use influxdb_iox_client::format::QueryOutputFormat;
|
||||
use query::exec::ExecutionContextProvider;
|
||||
use server::{connection::ConnectionManager, Error};
|
||||
use server::Error;
|
||||
|
||||
// External crates
|
||||
use async_trait::async_trait;
|
||||
|
@ -162,10 +162,7 @@ impl From<server::Error> for ApplicationError {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<M> HttpDrivenDml for DatabaseServerType<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
impl HttpDrivenDml for DatabaseServerType {
|
||||
fn max_request_size(&self) -> usize {
|
||||
self.max_request_size
|
||||
}
|
||||
|
@ -222,13 +219,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn route_request<M>(
|
||||
server_type: &DatabaseServerType<M>,
|
||||
pub async fn route_request(
|
||||
server_type: &DatabaseServerType,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApplicationError>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
) -> Result<Response<Body>, ApplicationError> {
|
||||
match server_type
|
||||
.route_dml_http_request(req)
|
||||
.await
|
||||
|
@ -266,9 +260,9 @@ fn default_format() -> String {
|
|||
QueryOutputFormat::default().to_string()
|
||||
}
|
||||
|
||||
async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||
async fn query(
|
||||
req: Request<Body>,
|
||||
server_type: &DatabaseServerType<M>,
|
||||
server_type: &DatabaseServerType,
|
||||
) -> Result<Response<Body>, ApplicationError> {
|
||||
let server = &server_type.server;
|
||||
|
||||
|
@ -340,10 +334,7 @@ mod tests {
|
|||
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
|
||||
use object_store::ObjectStore;
|
||||
use schema::selection::Selection;
|
||||
use server::{
|
||||
connection::ConnectionManagerImpl, db::Db, rules::ProvidedDatabaseRules, ApplicationState,
|
||||
Server,
|
||||
};
|
||||
use server::{db::Db, rules::ProvidedDatabaseRules, ApplicationState, Server};
|
||||
use trace::RingBufferTraceCollector;
|
||||
|
||||
fn make_application() -> Arc<ApplicationState> {
|
||||
|
@ -354,12 +345,8 @@ mod tests {
|
|||
))
|
||||
}
|
||||
|
||||
fn make_server(application: Arc<ApplicationState>) -> Arc<Server<ConnectionManagerImpl>> {
|
||||
Arc::new(Server::new(
|
||||
ConnectionManagerImpl::new(),
|
||||
application,
|
||||
Default::default(),
|
||||
))
|
||||
fn make_server(application: Arc<ApplicationState>) -> Arc<Server> {
|
||||
Arc::new(Server::new(application, Default::default()))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -377,10 +364,7 @@ mod tests {
|
|||
assert_tracing(setup_server().await).await;
|
||||
}
|
||||
|
||||
async fn assert_dbwrite(
|
||||
test_server: TestServer<DatabaseServerType<ConnectionManagerImpl>>,
|
||||
write: DmlWrite,
|
||||
) {
|
||||
async fn assert_dbwrite(test_server: TestServer<DatabaseServerType>, write: DmlWrite) {
|
||||
let (table_name, mutable_batch) = write.tables().next().unwrap();
|
||||
|
||||
let test_db = test_server
|
||||
|
@ -529,10 +513,7 @@ mod tests {
|
|||
/// Sets up a test database with some data for testing the query endpoint
|
||||
/// returns a client for communicating with the server, and the server
|
||||
/// endpoint
|
||||
async fn setup_test_data() -> (
|
||||
Client,
|
||||
TestServer<DatabaseServerType<ConnectionManagerImpl>>,
|
||||
) {
|
||||
async fn setup_test_data() -> (Client, TestServer<DatabaseServerType>) {
|
||||
let test_server = setup_server().await;
|
||||
|
||||
let client = Client::new();
|
||||
|
@ -689,7 +670,7 @@ mod tests {
|
|||
}
|
||||
|
||||
/// return a test server and the url to contact it for `MyOrg_MyBucket`
|
||||
async fn setup_server() -> TestServer<DatabaseServerType<ConnectionManagerImpl>> {
|
||||
async fn setup_server() -> TestServer<DatabaseServerType> {
|
||||
let application = make_application();
|
||||
|
||||
let app_server = make_server(Arc::clone(&application));
|
||||
|
|
|
@ -5,7 +5,7 @@ use futures::{future::FusedFuture, FutureExt};
|
|||
use hyper::{Body, Request, Response};
|
||||
use metric::Registry;
|
||||
use observability_deps::tracing::{error, info};
|
||||
use server::{connection::ConnectionManager, ApplicationState, Server};
|
||||
use server::{ApplicationState, Server};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use trace::TraceCollector;
|
||||
|
||||
|
@ -25,25 +25,19 @@ pub use self::http::ApplicationError;
|
|||
use super::common_state::CommonServerState;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseServerType<M>
|
||||
where
|
||||
M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static,
|
||||
{
|
||||
pub struct DatabaseServerType {
|
||||
pub application: Arc<ApplicationState>,
|
||||
pub server: Arc<Server<M>>,
|
||||
pub server: Arc<Server>,
|
||||
pub lp_metrics: Arc<LineProtocolMetrics>,
|
||||
pub max_request_size: usize,
|
||||
pub serving_readiness: ServingReadiness,
|
||||
shutdown: CancellationToken,
|
||||
}
|
||||
|
||||
impl<M> DatabaseServerType<M>
|
||||
where
|
||||
M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static,
|
||||
{
|
||||
impl DatabaseServerType {
|
||||
pub fn new(
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<M>>,
|
||||
server: Arc<Server>,
|
||||
common_state: &CommonServerState,
|
||||
) -> Self {
|
||||
let lp_metrics = Arc::new(LineProtocolMetrics::new(
|
||||
|
@ -62,10 +56,7 @@ where
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<M> ServerType for DatabaseServerType<M>
|
||||
where
|
||||
M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static,
|
||||
{
|
||||
impl ServerType for DatabaseServerType {
|
||||
type RouteError = ApplicationError;
|
||||
|
||||
fn metric_registry(&self) -> Arc<Registry> {
|
||||
|
@ -132,7 +123,7 @@ mod tests {
|
|||
use data_types::{database_rules::DatabaseRules, DatabaseName};
|
||||
use futures::pin_mut;
|
||||
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
|
||||
use server::{connection::ConnectionManagerImpl, rules::ProvidedDatabaseRules};
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64};
|
||||
use structopt::StructOpt;
|
||||
use tokio::task::JoinHandle;
|
||||
|
@ -157,7 +148,7 @@ mod tests {
|
|||
async fn test_serve(
|
||||
config: RunConfig,
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<ConnectionManagerImpl>>,
|
||||
server: Arc<Server>,
|
||||
) {
|
||||
let grpc_listener = grpc_listener(config.grpc_bind_address.into())
|
||||
.await
|
||||
|
@ -348,7 +339,7 @@ mod tests {
|
|||
collector: &Arc<T>,
|
||||
) -> (
|
||||
SocketAddr,
|
||||
Arc<Server<ConnectionManagerImpl>>,
|
||||
Arc<Server>,
|
||||
JoinHandle<crate::influxdb_ioxd::Result<()>>,
|
||||
) {
|
||||
let config = test_config(Some(23));
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::non_empty::NonEmptyString;
|
||||
|
@ -6,20 +5,17 @@ use data_types::DatabaseName;
|
|||
use dml::{DmlDelete, DmlMeta};
|
||||
use generated_types::google::{FieldViolationExt, FromOptionalField, OptionalField};
|
||||
use generated_types::influxdata::iox::delete::v1::*;
|
||||
use server::{connection::ConnectionManager, Server};
|
||||
use server::Server;
|
||||
use tonic::Response;
|
||||
|
||||
struct DeleteService<M: ConnectionManager> {
|
||||
server: Arc<Server<M>>,
|
||||
struct DeleteService {
|
||||
server: Arc<Server>,
|
||||
}
|
||||
|
||||
use super::error::{default_db_error_handler, default_server_error_handler};
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl<M> delete_service_server::DeleteService for DeleteService<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
impl delete_service_server::DeleteService for DeleteService {
|
||||
async fn delete(
|
||||
&self,
|
||||
request: tonic::Request<DeleteRequest>,
|
||||
|
@ -50,11 +46,8 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub fn make_server<M>(
|
||||
server: Arc<Server<M>>,
|
||||
) -> delete_service_server::DeleteServiceServer<impl delete_service_server::DeleteService>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
pub fn make_server(
|
||||
server: Arc<Server>,
|
||||
) -> delete_service_server::DeleteServiceServer<impl delete_service_server::DeleteService> {
|
||||
delete_service_server::DeleteServiceServer::new(DeleteService { server })
|
||||
}
|
||||
|
|
|
@ -3,12 +3,12 @@ use generated_types::{
|
|||
google::{FieldViolation, NotFound},
|
||||
influxdata::iox::deployment::v1::*,
|
||||
};
|
||||
use server::{connection::ConnectionManager, Error, Server};
|
||||
use std::{convert::TryFrom, fmt::Debug, sync::Arc};
|
||||
use server::{Error, Server};
|
||||
use std::{convert::TryFrom, sync::Arc};
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
struct DeploymentService<M: ConnectionManager> {
|
||||
server: Arc<Server<M>>,
|
||||
struct DeploymentService {
|
||||
server: Arc<Server>,
|
||||
serving_readiness: ServingReadiness,
|
||||
}
|
||||
|
||||
|
@ -16,10 +16,7 @@ use super::error::default_server_error_handler;
|
|||
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl<M> deployment_service_server::DeploymentService for DeploymentService<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
impl deployment_service_server::DeploymentService for DeploymentService {
|
||||
async fn get_server_id(
|
||||
&self,
|
||||
_: Request<GetServerIdRequest>,
|
||||
|
@ -69,15 +66,12 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub fn make_server<M>(
|
||||
server: Arc<Server<M>>,
|
||||
pub fn make_server(
|
||||
server: Arc<Server>,
|
||||
serving_readiness: ServingReadiness,
|
||||
) -> deployment_service_server::DeploymentServiceServer<
|
||||
impl deployment_service_server::DeploymentService,
|
||||
>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
> {
|
||||
deployment_service_server::DeploymentServiceServer::new(DeploymentService {
|
||||
server,
|
||||
serving_readiness,
|
||||
|
|
|
@ -25,7 +25,7 @@ use tonic::{Request, Response, Streaming};
|
|||
use data_types::{DatabaseName, DatabaseNameError};
|
||||
use observability_deps::tracing::{info, warn};
|
||||
use query::exec::{ExecutionContextProvider, IOxExecutionContext};
|
||||
use server::{connection::ConnectionManager, Server};
|
||||
use server::Server;
|
||||
|
||||
use super::error::default_server_error_handler;
|
||||
use crate::influxdb_ioxd::planner::Planner;
|
||||
|
@ -125,22 +125,16 @@ struct ReadInfo {
|
|||
|
||||
/// Concrete implementation of the gRPC Arrow Flight Service API
|
||||
#[derive(Debug)]
|
||||
struct FlightService<M: ConnectionManager> {
|
||||
server: Arc<Server<M>>,
|
||||
struct FlightService {
|
||||
server: Arc<Server>,
|
||||
}
|
||||
|
||||
pub fn make_server<M>(server: Arc<Server<M>>) -> FlightServer<impl Flight>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
pub fn make_server(server: Arc<Server>) -> FlightServer<impl Flight> {
|
||||
FlightServer::new(FlightService { server })
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl<M: ConnectionManager> Flight for FlightService<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
impl Flight for FlightService {
|
||||
type HandshakeStream = TonicStream<HandshakeResponse>;
|
||||
type ListFlightsStream = TonicStream<FlightInfo>;
|
||||
type DoGetStream = TonicStream<FlightData>;
|
||||
|
|
|
@ -4,16 +4,14 @@ use generated_types::{
|
|||
influxdata::iox::management::v1::{Error as ProtobufError, *},
|
||||
};
|
||||
use query::QueryDatabase;
|
||||
use server::{
|
||||
connection::ConnectionManager, rules::ProvidedDatabaseRules, ApplicationState, Error, Server,
|
||||
};
|
||||
use std::{convert::TryFrom, fmt::Debug, sync::Arc};
|
||||
use server::{rules::ProvidedDatabaseRules, ApplicationState, Error, Server};
|
||||
use std::{convert::TryFrom, sync::Arc};
|
||||
use tonic::{Request, Response, Status};
|
||||
use uuid::Uuid;
|
||||
|
||||
struct ManagementService<M: ConnectionManager> {
|
||||
struct ManagementService {
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<M>>,
|
||||
server: Arc<Server>,
|
||||
}
|
||||
|
||||
use super::error::{
|
||||
|
@ -21,10 +19,7 @@ use super::error::{
|
|||
};
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl<M> management_service_server::ManagementService for ManagementService<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
impl management_service_server::ManagementService for ManagementService {
|
||||
async fn list_databases(
|
||||
&self,
|
||||
request: Request<ListDatabasesRequest>,
|
||||
|
@ -523,15 +518,12 @@ fn format_rules(provided_rules: Arc<ProvidedDatabaseRules>, omit_defaults: bool)
|
|||
}
|
||||
}
|
||||
|
||||
pub fn make_server<M>(
|
||||
pub fn make_server(
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<M>>,
|
||||
server: Arc<Server>,
|
||||
) -> management_service_server::ManagementServiceServer<
|
||||
impl management_service_server::ManagementService,
|
||||
>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
> {
|
||||
management_service_server::ManagementServiceServer::new(ManagementService {
|
||||
application,
|
||||
server,
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use server::connection::ConnectionManager;
|
||||
|
||||
use crate::influxdb_ioxd::{
|
||||
rpc::{add_gated_service, add_service, serve_builder, setup_builder, RpcBuilderInput},
|
||||
server_type::{database::DatabaseServerType, RpcError},
|
||||
|
@ -16,13 +14,10 @@ mod operations;
|
|||
mod storage;
|
||||
mod write_pb;
|
||||
|
||||
pub async fn server_grpc<M>(
|
||||
server_type: Arc<DatabaseServerType<M>>,
|
||||
pub async fn server_grpc(
|
||||
server_type: Arc<DatabaseServerType>,
|
||||
builder_input: RpcBuilderInput,
|
||||
) -> Result<(), RpcError>
|
||||
where
|
||||
M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static,
|
||||
{
|
||||
) -> Result<(), RpcError> {
|
||||
let builder = setup_builder!(builder_input, server_type);
|
||||
|
||||
add_gated_service!(
|
||||
|
|
|
@ -2,21 +2,17 @@ use data_types::DatabaseName;
|
|||
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
||||
use generated_types::google::{FieldViolation, FieldViolationExt};
|
||||
use generated_types::influxdata::pbdata::v1::*;
|
||||
use server::{connection::ConnectionManager, Server};
|
||||
use std::fmt::Debug;
|
||||
use server::Server;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::error::{default_database_write_error_handler, default_server_error_handler};
|
||||
|
||||
struct PBWriteService<M: ConnectionManager> {
|
||||
server: Arc<Server<M>>,
|
||||
struct PBWriteService {
|
||||
server: Arc<Server>,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl<M> write_service_server::WriteService for PBWriteService<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
impl write_service_server::WriteService for PBWriteService {
|
||||
async fn write(
|
||||
&self,
|
||||
request: tonic::Request<WriteRequest>,
|
||||
|
@ -53,11 +49,8 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub fn make_server<M>(
|
||||
server: Arc<Server<M>>,
|
||||
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
pub fn make_server(
|
||||
server: Arc<Server>,
|
||||
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService> {
|
||||
write_service_server::WriteServiceServer::new(PBWriteService { server })
|
||||
}
|
||||
|
|
|
@ -2,9 +2,7 @@ use std::sync::Arc;
|
|||
|
||||
use object_store::ObjectStore;
|
||||
use observability_deps::tracing::warn;
|
||||
use server::{
|
||||
connection::ConnectionManagerImpl, ApplicationState, RemoteTemplate, Server, ServerConfig,
|
||||
};
|
||||
use server::{ApplicationState, Server, ServerConfig};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use trace::TraceCollector;
|
||||
|
||||
|
@ -47,18 +45,13 @@ pub async fn make_application(
|
|||
)))
|
||||
}
|
||||
|
||||
pub fn make_server(
|
||||
application: Arc<ApplicationState>,
|
||||
config: &Config,
|
||||
) -> Arc<Server<ConnectionManagerImpl>> {
|
||||
pub fn make_server(application: Arc<ApplicationState>, config: &Config) -> Arc<Server> {
|
||||
let server_config = ServerConfig {
|
||||
remote_template: config.remote_template.clone().map(RemoteTemplate::new),
|
||||
wipe_catalog_on_error: config.wipe_catalog_on_error.into(),
|
||||
skip_replay_and_seek_instead: config.skip_replay_and_seek_instead.into(),
|
||||
};
|
||||
|
||||
let connection_manager = ConnectionManagerImpl::new();
|
||||
let app_server = Arc::new(Server::new(connection_manager, application, server_config));
|
||||
let app_server = Arc::new(Server::new(application, server_config));
|
||||
|
||||
// if this ID isn't set the server won't be usable until this is set via an API
|
||||
// call
|
||||
|
|
|
@ -1,185 +0,0 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use cache_loader_async::cache_api::LoadingCache;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use dml::DmlWrite;
|
||||
use generated_types::influxdata::pbdata::v1::WriteRequest;
|
||||
use influxdb_iox_client::{connection::Builder, write};
|
||||
use observability_deps::tracing::debug;
|
||||
|
||||
type RemoteServerError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum ConnectionManagerError {
|
||||
#[snafu(display("cannot connect to remote: {}", source))]
|
||||
RemoteServerConnectError { source: RemoteServerError },
|
||||
#[snafu(display("cannot write to remote: {}", source))]
|
||||
RemoteServerWriteError { source: write::WriteError },
|
||||
}
|
||||
|
||||
/// The `Server` will ask the `ConnectionManager` for connections to a specific
|
||||
/// remote server. These connections can be used to communicate with other
|
||||
/// servers. This is implemented as a trait for dependency injection in testing.
|
||||
#[async_trait]
|
||||
pub trait ConnectionManager {
|
||||
type RemoteServer: RemoteServer + Send + Sync + 'static;
|
||||
|
||||
async fn remote_server(
|
||||
&self,
|
||||
connect: &str,
|
||||
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError>;
|
||||
}
|
||||
|
||||
/// The `RemoteServer` represents the API for replicating, subscribing, and
|
||||
/// querying other servers.
|
||||
#[async_trait]
|
||||
pub trait RemoteServer {
|
||||
/// Sends a [`DmlWrite`] to the remote server. An IOx server acting as a
|
||||
/// router/sharder will call this method to send entries to remotes.
|
||||
async fn write(&self, db: &str, write: &DmlWrite) -> Result<(), ConnectionManagerError>;
|
||||
}
|
||||
|
||||
/// The connection manager maps a host identifier to a remote server.
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionManagerImpl {
|
||||
cache: LoadingCache<String, Arc<RemoteServerImpl>, CacheFillError>,
|
||||
}
|
||||
|
||||
// Error must be Clone because LoadingCache requires so.
|
||||
#[derive(Debug, Snafu, Clone)]
|
||||
pub enum CacheFillError {
|
||||
#[snafu(display("gRPC error: {}", source))]
|
||||
GrpcError {
|
||||
source: Arc<dyn std::error::Error + Send + Sync + 'static>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ConnectionManagerImpl {
|
||||
pub fn new() -> Self {
|
||||
let cache = LoadingCache::new(Self::cached_remote_server);
|
||||
Self { cache }
|
||||
}
|
||||
|
||||
async fn cached_remote_server(
|
||||
connect: String,
|
||||
) -> Result<Arc<RemoteServerImpl>, CacheFillError> {
|
||||
let connection = Builder::default()
|
||||
.build(&connect)
|
||||
.await
|
||||
.map_err(|e| Arc::new(e) as _)
|
||||
.context(GrpcError)?;
|
||||
let client = write::Client::new(connection);
|
||||
Ok(Arc::new(RemoteServerImpl { client }))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConnectionManagerImpl {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectionManager for ConnectionManagerImpl {
|
||||
type RemoteServer = RemoteServerImpl;
|
||||
|
||||
async fn remote_server(
|
||||
&self,
|
||||
connect: &str,
|
||||
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError> {
|
||||
let ret = self
|
||||
.cache
|
||||
.get_with_meta(connect.to_string())
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(RemoteServerConnectError)?;
|
||||
debug!(was_cached=%ret.cached, %connect, "getting remote connection");
|
||||
Ok(ret.result)
|
||||
}
|
||||
}
|
||||
|
||||
/// An implementation for communicating with other IOx servers. This should
|
||||
/// be moved into and implemented in an influxdb_iox_client create at a later
|
||||
/// date.
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteServerImpl {
|
||||
client: write::Client,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RemoteServer for RemoteServerImpl {
|
||||
/// Sends a write to the remote server. An IOx server acting as a
|
||||
/// router/sharder will call this method to send entries to remotes.
|
||||
async fn write(&self, db_name: &str, write: &DmlWrite) -> Result<(), ConnectionManagerError> {
|
||||
let data = mutable_batch_pb::encode::encode_write(db_name, write);
|
||||
self.client
|
||||
.clone() // cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage
|
||||
.write_pb(WriteRequest {
|
||||
database_batch: Some(data),
|
||||
})
|
||||
.await
|
||||
.context(RemoteServerWriteError)
|
||||
}
|
||||
}
|
||||
|
||||
pub mod test_helpers {
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use super::*;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TestConnectionManager {
|
||||
pub remotes: BTreeMap<String, Arc<TestRemoteServer>>,
|
||||
}
|
||||
|
||||
impl Default for TestConnectionManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl TestConnectionManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
remotes: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectionManager for TestConnectionManager {
|
||||
type RemoteServer = TestRemoteServer;
|
||||
|
||||
async fn remote_server(
|
||||
&self,
|
||||
id: &str,
|
||||
) -> Result<Arc<TestRemoteServer>, ConnectionManagerError> {
|
||||
#[derive(Debug, Snafu)]
|
||||
enum TestRemoteError {
|
||||
#[snafu(display("remote not found"))]
|
||||
NotFound,
|
||||
}
|
||||
Ok(Arc::clone(self.remotes.get(id).ok_or_else(|| {
|
||||
ConnectionManagerError::RemoteServerConnectError {
|
||||
source: Box::new(TestRemoteError::NotFound),
|
||||
}
|
||||
})?))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TestRemoteServer {
|
||||
pub written: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<'a> RemoteServer for TestRemoteServer {
|
||||
async fn write(&self, _db: &str, _write: &DmlWrite) -> Result<(), ConnectionManagerError> {
|
||||
self.written.store(true, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
|
@ -70,7 +70,6 @@
|
|||
|
||||
use ::lifecycle::{LockableChunk, LockablePartition};
|
||||
use async_trait::async_trait;
|
||||
use connection::ConnectionManager;
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkId,
|
||||
detailed_database::ActiveDatabase,
|
||||
|
@ -87,7 +86,6 @@ use internal_types::freezable::Freezable;
|
|||
use iox_object_store::IoxObjectStore;
|
||||
use observability_deps::tracing::{error, info, warn};
|
||||
use parking_lot::RwLock;
|
||||
use resolver::Resolver;
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
use tokio::{sync::Notify, task::JoinError};
|
||||
|
@ -98,14 +96,11 @@ use uuid::Uuid;
|
|||
pub use application::ApplicationState;
|
||||
pub use db::Db;
|
||||
pub use job::JobRegistry;
|
||||
pub use resolver::{GrpcConnectionString, RemoteTemplate};
|
||||
|
||||
mod application;
|
||||
pub mod connection;
|
||||
pub mod database;
|
||||
pub mod db;
|
||||
mod job;
|
||||
mod resolver;
|
||||
|
||||
pub mod rules;
|
||||
use rules::{PersistedDatabaseRules, ProvidedDatabaseRules};
|
||||
|
@ -233,8 +228,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync {
|
|||
/// Configuration options for `Server`
|
||||
#[derive(Debug)]
|
||||
pub struct ServerConfig {
|
||||
pub remote_template: Option<RemoteTemplate>,
|
||||
|
||||
pub wipe_catalog_on_error: bool,
|
||||
|
||||
pub skip_replay_and_seek_instead: bool,
|
||||
|
@ -243,7 +236,6 @@ pub struct ServerConfig {
|
|||
impl Default for ServerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
remote_template: None,
|
||||
wipe_catalog_on_error: false,
|
||||
skip_replay_and_seek_instead: false,
|
||||
}
|
||||
|
@ -254,20 +246,15 @@ impl Default for ServerConfig {
|
|||
/// well as how they communicate with other servers. Each server will have one
|
||||
/// of these structs, which keeps track of all replication and query rules.
|
||||
#[derive(Debug)]
|
||||
pub struct Server<M: ConnectionManager> {
|
||||
connection_manager: Arc<M>,
|
||||
|
||||
pub struct Server {
|
||||
/// Future that resolves when the background worker exits
|
||||
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
|
||||
|
||||
/// Resolver for mapping ServerId to gRPC connection strings
|
||||
resolver: RwLock<Resolver>,
|
||||
|
||||
/// State shared with the background worker
|
||||
shared: Arc<ServerShared>,
|
||||
}
|
||||
|
||||
impl<M: ConnectionManager> Drop for Server<M> {
|
||||
impl Drop for Server {
|
||||
fn drop(&mut self) {
|
||||
if !self.shared.shutdown.is_cancelled() {
|
||||
warn!("server dropped without calling shutdown()");
|
||||
|
@ -466,15 +453,8 @@ impl ServerStateInitialized {
|
|||
}
|
||||
}
|
||||
|
||||
impl<M> Server<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync,
|
||||
{
|
||||
pub fn new(
|
||||
connection_manager: M,
|
||||
application: Arc<ApplicationState>,
|
||||
config: ServerConfig,
|
||||
) -> Self {
|
||||
impl Server {
|
||||
pub fn new(application: Arc<ApplicationState>, config: ServerConfig) -> Self {
|
||||
let shared = Arc::new(ServerShared {
|
||||
shutdown: Default::default(),
|
||||
application,
|
||||
|
@ -488,12 +468,7 @@ where
|
|||
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
|
||||
let join = handle.map_err(Arc::new).boxed().shared();
|
||||
|
||||
Self {
|
||||
shared,
|
||||
join,
|
||||
connection_manager: Arc::new(connection_manager),
|
||||
resolver: RwLock::new(Resolver::new(config.remote_template)),
|
||||
}
|
||||
Self { shared, join }
|
||||
}
|
||||
|
||||
/// sets the id of the server, which is used for replication and the base
|
||||
|
@ -860,18 +835,6 @@ where
|
|||
.context(CanNotUpdateRules { db_name })?)
|
||||
}
|
||||
|
||||
pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> {
|
||||
self.resolver.read().remotes_sorted()
|
||||
}
|
||||
|
||||
pub fn update_remote(&self, id: ServerId, addr: GrpcConnectionString) {
|
||||
self.resolver.write().update_remote(id, addr)
|
||||
}
|
||||
|
||||
pub fn delete_remote(&self, id: ServerId) -> Option<GrpcConnectionString> {
|
||||
self.resolver.write().delete_remote(id)
|
||||
}
|
||||
|
||||
/// Closes a chunk and starts moving its data to the read buffer, as a
|
||||
/// background job, dropping when complete.
|
||||
pub fn close_chunk(
|
||||
|
@ -1115,10 +1078,7 @@ async fn maybe_initialize_server(shared: &ServerShared) {
|
|||
|
||||
/// TODO: Revisit this trait's API
|
||||
#[async_trait]
|
||||
impl<M> DatabaseStore for Server<M>
|
||||
where
|
||||
M: ConnectionManager + std::fmt::Debug + Send + Sync,
|
||||
{
|
||||
impl DatabaseStore for Server {
|
||||
type Database = Db;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -1148,10 +1108,7 @@ where
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<M> Server<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync,
|
||||
{
|
||||
impl Server {
|
||||
/// For tests: list of database names in this server, regardless
|
||||
/// of their initialization state
|
||||
fn db_names_sorted(&self) -> Vec<String> {
|
||||
|
@ -1218,7 +1175,6 @@ async fn database_name_from_rules_file(
|
|||
|
||||
pub mod test_utils {
|
||||
use super::*;
|
||||
use crate::connection::test_helpers::TestConnectionManager;
|
||||
use object_store::ObjectStore;
|
||||
|
||||
/// Create a new [`ApplicationState`] with an in-memory object store
|
||||
|
@ -1231,12 +1187,8 @@ pub mod test_utils {
|
|||
}
|
||||
|
||||
/// Creates a new server with the provided [`ApplicationState`]
|
||||
pub fn make_server(application: Arc<ApplicationState>) -> Arc<Server<TestConnectionManager>> {
|
||||
Arc::new(Server::new(
|
||||
TestConnectionManager::new(),
|
||||
application,
|
||||
Default::default(),
|
||||
))
|
||||
pub fn make_server(application: Arc<ApplicationState>) -> Arc<Server> {
|
||||
Arc::new(Server::new(application, Default::default()))
|
||||
}
|
||||
|
||||
/// Creates a new server with the provided [`ApplicationState`]
|
||||
|
@ -1245,7 +1197,7 @@ pub mod test_utils {
|
|||
pub async fn make_initialized_server(
|
||||
server_id: ServerId,
|
||||
application: Arc<ApplicationState>,
|
||||
) -> Arc<Server<TestConnectionManager>> {
|
||||
) -> Arc<Server> {
|
||||
let server = make_server(application);
|
||||
server.set_id(server_id).unwrap();
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
@ -1457,13 +1409,10 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
async fn create_simple_database<M>(
|
||||
server: &Server<M>,
|
||||
async fn create_simple_database(
|
||||
server: &Server,
|
||||
name: impl Into<String> + Send,
|
||||
) -> Result<Arc<Database>>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync,
|
||||
{
|
||||
) -> Result<Arc<Database>> {
|
||||
let name = DatabaseName::new(name.into()).unwrap();
|
||||
|
||||
let rules = DatabaseRules {
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
use data_types::server_id::ServerId;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
/// A RemoteTemplate string is a remote connection template string.
|
||||
/// Occurrences of the substring "{id}" in the template will be replaced
|
||||
/// by the server ID.
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteTemplate {
|
||||
template: String,
|
||||
}
|
||||
|
||||
impl RemoteTemplate {
|
||||
pub fn new(template: impl Into<String>) -> Self {
|
||||
let template = template.into();
|
||||
Self { template }
|
||||
}
|
||||
}
|
||||
|
||||
/// A gRPC connection string.
|
||||
pub type GrpcConnectionString = String;
|
||||
|
||||
/// The Resolver provides a mapping between ServerId and GRpcConnectionString
|
||||
#[derive(Debug)]
|
||||
pub struct Resolver {
|
||||
/// Map between remote IOx server IDs and management API connection strings.
|
||||
remotes: BTreeMap<ServerId, GrpcConnectionString>,
|
||||
|
||||
/// Static map between remote server IDs and hostnames based on a template
|
||||
remote_template: Option<RemoteTemplate>,
|
||||
}
|
||||
|
||||
impl Resolver {
|
||||
pub fn new(remote_template: Option<RemoteTemplate>) -> Self {
|
||||
Self {
|
||||
remotes: Default::default(),
|
||||
remote_template,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all registered remote servers.
|
||||
pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> {
|
||||
self.remotes.iter().map(|(&a, b)| (a, b.clone())).collect()
|
||||
}
|
||||
|
||||
/// Update given remote server.
|
||||
pub fn update_remote(&mut self, id: ServerId, addr: GrpcConnectionString) {
|
||||
self.remotes.insert(id, addr);
|
||||
}
|
||||
|
||||
/// Delete remote server by ID.
|
||||
pub fn delete_remote(&mut self, id: ServerId) -> Option<GrpcConnectionString> {
|
||||
self.remotes.remove(&id)
|
||||
}
|
||||
}
|
|
@ -23,7 +23,6 @@ use query::frontend::sql::SqlQueryPlanner;
|
|||
use regex::Regex;
|
||||
use router::router::Router;
|
||||
use router::server::RouterServer;
|
||||
use server::connection::test_helpers::TestConnectionManager;
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use server::test_utils::{make_application, make_initialized_server};
|
||||
use server::{Db, Server};
|
||||
|
@ -44,7 +43,7 @@ use write_buffer::mock::MockBufferSharedState;
|
|||
struct DistributedTest {
|
||||
router: Arc<Router>,
|
||||
|
||||
consumer: Arc<Server<TestConnectionManager>>,
|
||||
consumer: Arc<Server>,
|
||||
consumer_db: Arc<Db>,
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue