refactor: extract resolver from server::Config (#2143)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-07-29 14:14:58 +01:00 committed by GitHub
parent 336ff30484
commit 431774c8b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 137 additions and 155 deletions

View File

@ -68,7 +68,6 @@ impl Config {
exec: Arc<Executor>,
server_id: ServerId,
metric_registry: Arc<MetricRegistry>,
remote_template: Option<RemoteTemplate>,
) -> Self {
Self {
jobs,
@ -77,7 +76,7 @@ impl Config {
server_id,
metric_registry,
shutdown: Default::default(),
state: RwLock::new(ConfigState::new(remote_template)),
state: Default::default(),
}
}
@ -228,34 +227,6 @@ impl Config {
db.update_rules(update).map_err(UpdateError::Closure)
}
/// Get all registered remote servers.
pub(crate) fn remotes_sorted(&self) -> Vec<(ServerId, String)> {
let state = self.state.read().expect("mutex poisoned");
state.remotes.iter().map(|(&a, b)| (a, b.clone())).collect()
}
/// Update given remote server.
pub(crate) fn update_remote(&self, id: ServerId, addr: GRpcConnectionString) {
let mut state = self.state.write().expect("mutex poisoned");
state.remotes.insert(id, addr);
}
/// Delete remote server by ID.
pub(crate) fn delete_remote(&self, id: ServerId) -> Option<GRpcConnectionString> {
let mut state = self.state.write().expect("mutex poisoned");
state.remotes.remove(&id)
}
/// Get remote server by ID.
pub(crate) fn resolve_remote(&self, id: ServerId) -> Option<GRpcConnectionString> {
let state = self.state.read().expect("mutex poisoned");
state
.remotes
.get(&id)
.cloned()
.or_else(|| state.remote_template.as_ref().map(|t| t.get(&id)))
}
/// Commit new or unchanged database state.
fn commit_db(&self, db_state: Arc<DatabaseState>) {
let mut state = self.state.write().expect("mutex poisoned");
@ -336,9 +307,6 @@ pub fn object_store_path_for_database_config<P: ObjectStorePath>(
path
}
/// A gRPC connection string.
pub type GRpcConnectionString = String;
/// Inner config state that is protected by a lock.
#[derive(Default, Debug)]
struct ConfigState {
@ -347,40 +315,6 @@ struct ConfigState {
/// Databases in different states.
databases: BTreeMap<DatabaseName<'static>, Arc<DatabaseState>>,
/// Map between remote IOx server IDs and management API connection strings.
remotes: BTreeMap<ServerId, GRpcConnectionString>,
/// Static map between remote server IDs and hostnames based on a template
remote_template: Option<RemoteTemplate>,
}
impl ConfigState {
fn new(remote_template: Option<RemoteTemplate>) -> Self {
Self {
remote_template,
..Default::default()
}
}
}
/// A RemoteTemplate string is a remote connection template string.
/// Occurrences of the substring "{id}" in the template will be replaced
/// by the server ID.
#[derive(Debug)]
pub struct RemoteTemplate {
template: String,
}
impl RemoteTemplate {
pub fn new(template: impl Into<String>) -> Self {
let template = template.into();
Self { template }
}
fn get(&self, id: &ServerId) -> GRpcConnectionString {
self.template.replace("{id}", &format!("{}", id.get_u32()))
}
}
/// Internal representation of the different database states.
@ -703,9 +637,8 @@ mod test {
use crate::db::load::load_or_create_preserved_catalog;
use super::*;
use std::num::NonZeroU32;
fn make_config(remote_template: Option<RemoteTemplate>) -> Config {
fn make_config() -> Config {
let store = Arc::new(ObjectStore::new_in_memory());
let server_id = ServerId::try_from(1).unwrap();
let metric_registry = Arc::new(metrics::MetricRegistry::new());
@ -715,7 +648,6 @@ mod test {
Arc::new(Executor::new(1)),
server_id,
Arc::clone(&metric_registry),
remote_template,
)
}
@ -723,7 +655,7 @@ mod test {
async fn create_db() {
// setup
let name = DatabaseName::new("foo").unwrap();
let config = make_config(None);
let config = make_config();
let rules = DatabaseRules::new(name.clone());
// getting handle while DB is reserved => fails
@ -827,7 +759,7 @@ mod test {
async fn recover_db() {
// setup
let name = DatabaseName::new("foo").unwrap();
let config = make_config(None);
let config = make_config();
let rules = DatabaseRules::new(name.clone());
// create DB but don't continue with rules loaded (e.g. because the rules file is broken)
@ -914,7 +846,7 @@ mod test {
async fn block_db() {
// setup
let name = DatabaseName::new("foo").unwrap();
let config = make_config(None);
let config = make_config();
// block DB
let handle = config.block_db(name.clone()).unwrap();
@ -943,7 +875,7 @@ mod test {
async fn test_db_drop() {
// setup
let name = DatabaseName::new("foo").unwrap();
let config = make_config(None);
let config = make_config();
let rules = DatabaseRules::new(name.clone());
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
&name,
@ -1000,23 +932,4 @@ mod test {
assert_eq!(rules_path, expected_path);
}
#[test]
fn resolve_remote() {
let config = make_config(Some(RemoteTemplate::new("http://iox-query-{id}:8082")));
let server_id = ServerId::new(NonZeroU32::new(42).unwrap());
let remote = config.resolve_remote(server_id);
assert_eq!(
remote,
Some(GRpcConnectionString::from("http://iox-query-42:8082"))
);
let server_id = ServerId::new(NonZeroU32::new(24).unwrap());
let remote = config.resolve_remote(server_id);
assert_eq!(
remote,
Some(GRpcConnectionString::from("http://iox-query-24:8082"))
);
}
}

View File

@ -74,7 +74,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use config::{object_store_path_for_database_config, Config, GRpcConnectionString};
use config::{object_store_path_for_database_config, Config};
use data_types::database_rules::ShardConfig;
use data_types::{
database_rules::{
@ -97,18 +97,20 @@ use observability_deps::tracing::{error, info, warn};
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use query::{exec::Executor, DatabaseStore};
use rand::seq::SliceRandom;
use resolver::Resolver;
use snafu::{OptionExt, ResultExt, Snafu};
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
use write_buffer::config::WriteBufferConfig;
pub use config::RemoteTemplate;
pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer};
pub use db::Db;
pub use resolver::{GrpcConnectionString, RemoteTemplate};
mod config;
mod connection;
pub mod db;
mod init;
mod resolver;
/// Utility modules used by benchmarks and tests
pub mod utils;
@ -210,7 +212,7 @@ pub enum Error {
#[snafu(display("all remotes failed connecting: {:?}", errors))]
NoRemoteReachable {
errors: HashMap<GRpcConnectionString, connection::ConnectionManagerError>,
errors: HashMap<GrpcConnectionString, connection::ConnectionManagerError>,
},
#[snafu(display("remote error: {}", source))]
@ -429,6 +431,9 @@ pub struct Server<M: ConnectionManager> {
/// and populates the endpoint with this data.
registry: Arc<metrics::MetricRegistry>,
/// Resolver for mapping ServerId to gRPC connection strings
resolver: RwLock<Resolver>,
/// The state machine for server startup
stage: Arc<RwLock<ServerStage>>,
}
@ -448,10 +453,7 @@ pub struct Server<M: ConnectionManager> {
#[derive(Debug)]
enum ServerStage {
/// Server has started but doesn't have a server id yet
Startup {
remote_template: Option<RemoteTemplate>,
wipe_catalog_on_error: bool,
},
Startup { wipe_catalog_on_error: bool },
/// Server can be initialized
InitReady {
@ -513,8 +515,8 @@ where
jobs,
metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))),
registry: Arc::clone(&metric_registry),
resolver: RwLock::new(Resolver::new(remote_template)),
stage: Arc::new(RwLock::new(ServerStage::Startup {
remote_template,
wipe_catalog_on_error,
})),
}
@ -528,11 +530,8 @@ where
let mut stage = self.stage.write();
match &mut *stage {
ServerStage::Startup {
remote_template,
wipe_catalog_on_error,
} => {
let remote_template = remote_template.take();
*stage = ServerStage::InitReady {
wipe_catalog_on_error: *wipe_catalog_on_error,
config: Arc::new(Config::new(
@ -541,7 +540,6 @@ where
Arc::clone(&self.exec),
id,
Arc::clone(&self.registry),
remote_template,
)),
last_error: None,
};
@ -907,12 +905,16 @@ where
entry: Entry,
) -> Result<()> {
// Return an error if this server is not yet ready
let config = self.config()?;
self.require_initialized()?;
let addrs: Vec<_> = {
let resolver = self.resolver.read();
node_group
.iter()
.filter_map(|&node| resolver.resolve_remote(node))
.collect()
};
let addrs: Vec<_> = node_group
.iter()
.filter_map(|&node| config.resolve_remote(node))
.collect();
if addrs.is_empty() {
return NoRemoteConfigured { node_group }.fail();
}
@ -1018,23 +1020,16 @@ where
Ok(rules)
}
pub fn remotes_sorted(&self) -> Result<Vec<(ServerId, String)>> {
// TODO: Should these be on ConnectionManager and not Config
let config = self.config()?;
Ok(config.remotes_sorted())
pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> {
self.resolver.read().remotes_sorted()
}
pub fn update_remote(&self, id: ServerId, addr: GRpcConnectionString) -> Result<()> {
// TODO: Should these be on ConnectionManager and not Config
let config = self.config()?;
config.update_remote(id, addr);
Ok(())
pub fn update_remote(&self, id: ServerId, addr: GrpcConnectionString) {
self.resolver.write().update_remote(id, addr)
}
pub fn delete_remote(&self, id: ServerId) -> Result<Option<GRpcConnectionString>> {
// TODO: Should these be on ConnectionManager and not Config
let config = self.config()?;
Ok(config.delete_remote(id))
pub fn delete_remote(&self, id: ServerId) -> Option<GrpcConnectionString> {
self.resolver.write().delete_remote(id)
}
pub fn spawn_dummy_job(&self, nanos: Vec<u64>) -> TaskTracker<Job> {
@ -1686,9 +1681,7 @@ mod tests {
);
// one remote is configured but it's down and we'll get connection error
server
.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into())
.unwrap();
server.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into());
let err = server
.write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME)
.await
@ -1705,12 +1698,8 @@ mod tests {
// We configure the address for the other remote, this time connection will succeed
// despite the bad remote failing to connect.
server
.update_remote(good_remote_id_1, GOOD_REMOTE_ADDR_1.into())
.unwrap();
server
.update_remote(good_remote_id_2, GOOD_REMOTE_ADDR_2.into())
.unwrap();
server.update_remote(good_remote_id_1, GOOD_REMOTE_ADDR_1.into());
server.update_remote(good_remote_id_2, GOOD_REMOTE_ADDR_2.into());
// Remotes are tried in random order, so we need to repeat the test a few times to have a reasonable
// probability both the remotes will get hit.

91
server/src/resolver.rs Normal file
View File

@ -0,0 +1,91 @@
use data_types::server_id::ServerId;
use std::collections::BTreeMap;
/// A RemoteTemplate string is a remote connection template string.
/// Occurrences of the substring "{id}" in the template will be replaced
/// by the server ID.
#[derive(Debug)]
pub struct RemoteTemplate {
template: String,
}
impl RemoteTemplate {
pub fn new(template: impl Into<String>) -> Self {
let template = template.into();
Self { template }
}
fn get(&self, id: &ServerId) -> GrpcConnectionString {
self.template.replace("{id}", &format!("{}", id.get_u32()))
}
}
/// A gRPC connection string.
pub type GrpcConnectionString = String;
/// The Resolver provides a mapping between ServerId and GRpcConnectionString
#[derive(Debug)]
pub struct Resolver {
/// Map between remote IOx server IDs and management API connection strings.
remotes: BTreeMap<ServerId, GrpcConnectionString>,
/// Static map between remote server IDs and hostnames based on a template
remote_template: Option<RemoteTemplate>,
}
impl Resolver {
pub fn new(remote_template: Option<RemoteTemplate>) -> Self {
Self {
remotes: Default::default(),
remote_template,
}
}
/// Get all registered remote servers.
pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> {
self.remotes.iter().map(|(&a, b)| (a, b.clone())).collect()
}
/// Update given remote server.
pub fn update_remote(&mut self, id: ServerId, addr: GrpcConnectionString) {
self.remotes.insert(id, addr);
}
/// Delete remote server by ID.
pub fn delete_remote(&mut self, id: ServerId) -> Option<GrpcConnectionString> {
self.remotes.remove(&id)
}
/// Get remote server by ID.
pub fn resolve_remote(&self, id: ServerId) -> Option<GrpcConnectionString> {
self.remotes
.get(&id)
.cloned()
.or_else(|| self.remote_template.as_ref().map(|t| t.get(&id)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::num::NonZeroU32;
#[test]
fn resolve_remote() {
let resolver = Resolver::new(Some(RemoteTemplate::new("http://iox-query-{id}:8082")));
let server_id = ServerId::new(NonZeroU32::new(42).unwrap());
let remote = resolver.resolve_remote(server_id);
assert_eq!(
remote,
Some(GrpcConnectionString::from("http://iox-query-42:8082"))
);
let server_id = ServerId::new(NonZeroU32::new(24).unwrap());
let remote = resolver.resolve_remote(server_id);
assert_eq!(
remote,
Some(GrpcConnectionString::from("http://iox-query-24:8082"))
);
}
}

View File

@ -198,17 +198,15 @@ where
&self,
_: Request<ListRemotesRequest>,
) -> Result<Response<ListRemotesResponse>, Status> {
let result = self.server.remotes_sorted();
let remotes = match result {
Ok(remotes) => remotes
.into_iter()
.map(|(id, connection_string)| Remote {
id: id.get_u32(),
connection_string,
})
.collect(),
Err(e) => return Err(default_server_error_handler(e)),
};
let remotes = self
.server
.remotes_sorted()
.into_iter()
.map(|(id, connection_string)| Remote {
id: id.get_u32(),
connection_string,
})
.collect();
Ok(Response::new(ListRemotesResponse { remotes }))
}
@ -224,15 +222,9 @@ where
let remote_id = ServerId::try_from(remote.id)
.map_err(|_| FieldViolation::required("id").scope("remote"))?;
let result = self
.server
self.server
.update_remote(remote_id, remote.connection_string);
match result {
Ok(_) => {}
Err(e) => return Err(default_server_error_handler(e)),
}
Ok(Response::new(UpdateRemoteResponse {}))
}
@ -245,9 +237,8 @@ where
ServerId::try_from(request.id).map_err(|_| FieldViolation::required("id"))?;
match self.server.delete_remote(remote_id) {
Ok(Some(_)) => {}
Ok(None) => return Err(NotFound::default().into()),
Err(e) => return Err(default_server_error_handler(e)),
Some(_) => {}
None => return Err(NotFound::default().into()),
}
Ok(Response::new(DeleteRemoteResponse {}))

View File

@ -67,8 +67,6 @@ async fn test_list_update_remotes() {
const TEST_REMOTE_ADDR_2: &str = "4.3.2.1:4321";
const TEST_REMOTE_ADDR_2_UPDATED: &str = "40.30.20.10:4321";
client.update_server_id(123).await.unwrap();
let res = client.list_remotes().await.expect("list remotes failed");
assert_eq!(res.len(), 0);