refactor: move connection manager to separate module (#2142)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-07-29 13:58:15 +01:00 committed by GitHub
parent 4a44b14d4d
commit df3b162475
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 221 additions and 207 deletions

180
server/src/connection.rs Normal file
View File

@ -0,0 +1,180 @@
use std::sync::Arc;
use async_trait::async_trait;
use cache_loader_async::cache_api::LoadingCache;
use snafu::{ResultExt, Snafu};
use entry::Entry;
use influxdb_iox_client::{connection::Builder, write};
use observability_deps::tracing::debug;
type RemoteServerError = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(Debug, Snafu)]
pub enum ConnectionManagerError {
#[snafu(display("cannot connect to remote: {}", source))]
RemoteServerConnectError { source: RemoteServerError },
#[snafu(display("cannot write to remote: {}", source))]
RemoteServerWriteError { source: write::WriteError },
}
/// The `Server` will ask the `ConnectionManager` for connections to a specific
/// remote server. These connections can be used to communicate with other
/// servers. This is implemented as a trait for dependency injection in testing.
#[async_trait]
pub trait ConnectionManager {
type RemoteServer: RemoteServer + Send + Sync + 'static;
async fn remote_server(
&self,
connect: &str,
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError>;
}
/// The `RemoteServer` represents the API for replicating, subscribing, and
/// querying other servers.
#[async_trait]
pub trait RemoteServer {
/// Sends an Entry to the remote server. An IOx server acting as a
/// router/sharder will call this method to send entries to remotes.
async fn write_entry(&self, db: &str, entry: Entry) -> Result<(), ConnectionManagerError>;
}
/// The connection manager maps a host identifier to a remote server.
#[derive(Debug)]
pub struct ConnectionManagerImpl {
cache: LoadingCache<String, Arc<RemoteServerImpl>, CacheFillError>,
}
// Error must be Clone because LoadingCache requires so.
#[derive(Debug, Snafu, Clone)]
pub enum CacheFillError {
#[snafu(display("gRPC error: {}", source))]
GrpcError {
source: Arc<dyn std::error::Error + Send + Sync + 'static>,
},
}
impl ConnectionManagerImpl {
pub fn new() -> Self {
let (cache, _) = LoadingCache::new(Self::cached_remote_server);
Self { cache }
}
async fn cached_remote_server(
connect: String,
) -> Result<Arc<RemoteServerImpl>, CacheFillError> {
let connection = Builder::default()
.build(&connect)
.await
.map_err(|e| Arc::new(e) as _)
.context(GrpcError)?;
let client = write::Client::new(connection);
Ok(Arc::new(RemoteServerImpl { client }))
}
}
impl Default for ConnectionManagerImpl {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ConnectionManager for ConnectionManagerImpl {
type RemoteServer = RemoteServerImpl;
async fn remote_server(
&self,
connect: &str,
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError> {
let ret = self
.cache
.get_with_meta(connect.to_string())
.await
.map_err(|e| Box::new(e) as _)
.context(RemoteServerConnectError)?;
debug!(was_cached=%ret.cached, %connect, "getting remote connection");
Ok(ret.result)
}
}
/// An implementation for communicating with other IOx servers. This should
/// be moved into and implemented in an influxdb_iox_client create at a later
/// date.
#[derive(Debug)]
pub struct RemoteServerImpl {
client: write::Client,
}
#[async_trait]
impl RemoteServer for RemoteServerImpl {
/// Sends an Entry to the remote server. An IOx server acting as a
/// router/sharder will call this method to send entries to remotes.
async fn write_entry(&self, db_name: &str, entry: Entry) -> Result<(), ConnectionManagerError> {
self.client
.clone() // cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage
.write_entry(db_name, entry)
.await
.context(RemoteServerWriteError)
}
}
#[cfg(test)]
pub(crate) mod test_helpers {
use std::sync::atomic::{AtomicBool, Ordering};
use super::*;
use std::collections::BTreeMap;
#[derive(Debug)]
pub struct TestConnectionManager {
pub remotes: BTreeMap<String, Arc<TestRemoteServer>>,
}
impl TestConnectionManager {
pub fn new() -> Self {
Self {
remotes: BTreeMap::new(),
}
}
}
#[async_trait]
impl ConnectionManager for TestConnectionManager {
type RemoteServer = TestRemoteServer;
async fn remote_server(
&self,
id: &str,
) -> Result<Arc<TestRemoteServer>, ConnectionManagerError> {
#[derive(Debug, Snafu)]
enum TestRemoteError {
#[snafu(display("remote not found"))]
NotFound,
}
Ok(Arc::clone(self.remotes.get(id).ok_or_else(|| {
ConnectionManagerError::RemoteServerConnectError {
source: Box::new(TestRemoteError::NotFound),
}
})?))
}
}
#[derive(Debug)]
pub struct TestRemoteServer {
pub written: Arc<AtomicBool>,
}
#[async_trait]
impl<'a> RemoteServer for TestRemoteServer {
async fn write_entry(
&self,
_db: &str,
_entry: Entry,
) -> Result<(), ConnectionManagerError> {
self.written.store(true, Ordering::Relaxed);
Ok(())
}
}
}

View File

@ -68,16 +68,14 @@
clippy::future_not_send
)]
use std::collections::HashMap;
use std::convert::{Infallible, TryInto};
use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use db::load::create_preserved_catalog;
use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::{Mutex, RwLockUpgradableReadGuard};
use snafu::{OptionExt, ResultExt, Snafu};
use config::{object_store_path_for_database_config, Config, GRpcConnectionString};
use data_types::database_rules::ShardConfig;
use data_types::{
database_rules::{
DatabaseRules, NodeGroup, RoutingRules, ShardId, Sink, WriteBufferConnection,
@ -87,28 +85,28 @@ use data_types::{
server_id::ServerId,
{DatabaseName, DatabaseNameError},
};
use db::load::create_preserved_catalog;
use entry::{lines_to_sharded_entries, pb_to_entry, Entry, ShardedEntry};
use generated_types::database_rules::encode_database_rules;
use generated_types::influxdata::transfer::column::v1 as pb;
use influxdb_line_protocol::ParsedLine;
use lifecycle::LockableChunk;
use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry};
use object_store::{ObjectStore, ObjectStoreApi};
use parking_lot::RwLock;
use observability_deps::tracing::{error, info, warn};
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use query::{exec::Executor, DatabaseStore};
use rand::seq::SliceRandom;
use snafu::{OptionExt, ResultExt, Snafu};
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
use write_buffer::config::WriteBufferConfig;
pub use crate::config::RemoteTemplate;
use crate::config::{object_store_path_for_database_config, Config, GRpcConnectionString};
use cache_loader_async::cache_api::LoadingCache;
use data_types::database_rules::ShardConfig;
pub use config::RemoteTemplate;
pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer};
pub use db::Db;
use generated_types::database_rules::encode_database_rules;
use influxdb_iox_client::{connection::Builder, write};
use lifecycle::LockableChunk;
use rand::seq::SliceRandom;
use std::collections::HashMap;
mod config;
mod connection;
pub mod db;
mod init;
@ -212,11 +210,13 @@ pub enum Error {
#[snafu(display("all remotes failed connecting: {:?}", errors))]
NoRemoteReachable {
errors: HashMap<GRpcConnectionString, ConnectionManagerError>,
errors: HashMap<GRpcConnectionString, connection::ConnectionManagerError>,
},
#[snafu(display("remote error: {}", source))]
RemoteError { source: ConnectionManagerError },
RemoteError {
source: connection::ConnectionManagerError,
},
#[snafu(display("cannot create preserved catalog: {}", source))]
CannotCreatePreservedCatalog { source: DatabaseError },
@ -1242,141 +1242,9 @@ where
}
}
type RemoteServerError = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(Debug, Snafu)]
pub enum ConnectionManagerError {
#[snafu(display("cannot connect to remote: {}", source))]
RemoteServerConnectError { source: RemoteServerError },
#[snafu(display("cannot write to remote: {}", source))]
RemoteServerWriteError { source: write::WriteError },
}
/// The `Server` will ask the `ConnectionManager` for connections to a specific
/// remote server. These connections can be used to communicate with other
/// servers. This is implemented as a trait for dependency injection in testing.
#[async_trait]
pub trait ConnectionManager {
type RemoteServer: RemoteServer + Send + Sync + 'static;
async fn remote_server(
&self,
connect: &str,
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError>;
}
/// The `RemoteServer` represents the API for replicating, subscribing, and
/// querying other servers.
#[async_trait]
pub trait RemoteServer {
/// Sends an Entry to the remote server. An IOx server acting as a
/// router/sharder will call this method to send entries to remotes.
async fn write_entry(&self, db: &str, entry: Entry) -> Result<(), ConnectionManagerError>;
}
/// The connection manager maps a host identifier to a remote server.
#[derive(Debug)]
pub struct ConnectionManagerImpl {
cache: LoadingCache<String, Arc<RemoteServerImpl>, CacheFillError>,
}
// Error must be Clone because LoadingCache requires so.
#[derive(Debug, Snafu, Clone)]
pub enum CacheFillError {
#[snafu(display("gRPC error: {}", source))]
GrpcError {
source: Arc<dyn std::error::Error + Send + Sync + 'static>,
},
}
impl ConnectionManagerImpl {
pub fn new() -> Self {
let (cache, _) = LoadingCache::new(Self::cached_remote_server);
Self { cache }
}
async fn cached_remote_server(
connect: String,
) -> Result<Arc<RemoteServerImpl>, CacheFillError> {
let connection = Builder::default()
.build(&connect)
.await
.map_err(|e| Arc::new(e) as _)
.context(GrpcError)?;
let client = write::Client::new(connection);
Ok(Arc::new(RemoteServerImpl { client }))
}
}
impl Default for ConnectionManagerImpl {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ConnectionManager for ConnectionManagerImpl {
type RemoteServer = RemoteServerImpl;
async fn remote_server(
&self,
connect: &str,
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError> {
let ret = self
.cache
.get_with_meta(connect.to_string())
.await
.map_err(|e| Box::new(e) as _)
.context(RemoteServerConnectError)?;
debug!(was_cached=%ret.cached, %connect, "getting remote connection");
Ok(ret.result)
}
}
/// An implementation for communicating with other IOx servers. This should
/// be moved into and implemented in an influxdb_iox_client create at a later
/// date.
#[derive(Debug)]
pub struct RemoteServerImpl {
client: write::Client,
}
#[async_trait]
impl RemoteServer for RemoteServerImpl {
/// Sends an Entry to the remote server. An IOx server acting as a
/// router/sharder will call this method to send entries to remotes.
async fn write_entry(&self, db_name: &str, entry: Entry) -> Result<(), ConnectionManagerError> {
self.client
.clone() // cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage
.write_entry(db_name, entry)
.await
.context(RemoteServerWriteError)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::TestDb;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use async_trait::async_trait;
use bytes::Bytes;
use data_types::chunk_metadata::ChunkAddr;
use data_types::database_rules::{
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG,
};
use entry::test_helpers::lp_to_entry;
use futures::TryStreamExt;
use generated_types::database_rules::decode_database_rules;
use influxdb_line_protocol::parse_lines;
use metrics::MetricRegistry;
use object_store::path::ObjectStorePath;
use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog};
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase};
use snafu::Snafu;
use std::{
collections::BTreeMap,
convert::TryFrom,
sync::{
atomic::{AtomicBool, Ordering},
@ -1384,11 +1252,34 @@ mod tests {
},
time::{Duration, Instant},
};
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use futures::TryStreamExt;
use tempfile::TempDir;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use arrow_util::assert_batches_eq;
use connection::test_helpers::TestConnectionManager;
use data_types::chunk_metadata::ChunkAddr;
use data_types::database_rules::{
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG,
};
use entry::test_helpers::lp_to_entry;
use generated_types::database_rules::decode_database_rules;
use influxdb_line_protocol::parse_lines;
use metrics::MetricRegistry;
use object_store::path::ObjectStorePath;
use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog};
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase};
use write_buffer::mock::MockBufferForWritingThatAlwaysErrors;
use crate::connection::test_helpers::TestRemoteServer;
use crate::utils::TestDb;
use super::*;
const ARBITRARY_DEFAULT_TIME: i64 = 456;
// TODO: perhaps switch to a builder pattern.
@ -1796,7 +1687,7 @@ mod tests {
err,
Error::NoRemoteReachable { errors } if matches!(
errors[BAD_REMOTE_ADDR],
ConnectionManagerError::RemoteServerConnectError {..}
connection::ConnectionManagerError::RemoteServerConnectError {..}
)
));
assert!(!written_1.load(Ordering::Relaxed));
@ -1919,63 +1810,6 @@ mod tests {
let _ = background_handle.await;
}
#[derive(Snafu, Debug, Clone)]
enum TestClusterError {
#[snafu(display("Test cluster error: {}", message))]
General { message: String },
}
#[derive(Debug)]
struct TestConnectionManager {
remotes: BTreeMap<String, Arc<TestRemoteServer>>,
}
impl TestConnectionManager {
fn new() -> Self {
Self {
remotes: BTreeMap::new(),
}
}
}
#[async_trait]
impl ConnectionManager for TestConnectionManager {
type RemoteServer = TestRemoteServer;
async fn remote_server(
&self,
id: &str,
) -> Result<Arc<TestRemoteServer>, ConnectionManagerError> {
#[derive(Debug, Snafu)]
enum TestRemoteError {
#[snafu(display("remote not found"))]
NotFound,
}
Ok(Arc::clone(self.remotes.get(id).ok_or_else(|| {
ConnectionManagerError::RemoteServerConnectError {
source: Box::new(TestRemoteError::NotFound),
}
})?))
}
}
#[derive(Debug)]
struct TestRemoteServer {
written: Arc<AtomicBool>,
}
#[async_trait]
impl<'a> RemoteServer for TestRemoteServer {
async fn write_entry(
&self,
_db: &str,
_entry: Entry,
) -> Result<(), ConnectionManagerError> {
self.written.store(true, Ordering::Relaxed);
Ok(())
}
}
fn parsed_lines(lp: &str) -> Vec<ParsedLine<'_>> {
parse_lines(lp).map(|l| l.unwrap()).collect()
}