refactor: extract server init code

This prepares for #1624, so the end results looks a bit cleaner.
pull/24376/head
Marco Neumann 2021-06-09 16:53:11 +02:00
parent a614fef5bc
commit d9c38dfe88
5 changed files with 352 additions and 242 deletions

280
server/src/init.rs Normal file
View File

@ -0,0 +1,280 @@
//! Routines to initialize a server.
use data_types::server_id::ServerId;
use futures::TryStreamExt;
use generated_types::database_rules::decode_database_rules;
use internal_types::once::OnceNonZeroU32;
use object_store::{
path::{ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use observability_deps::tracing::{error, info, warn};
use query::exec::Executor;
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::sync::Semaphore;
use crate::{
config::{Config, DB_RULES_FILE_NAME},
db::load_or_create_preserved_catalog,
DatabaseError,
};
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("cannot load catalog: {}", source))]
CatalogLoadError { source: DatabaseError },
#[snafu(display("error deserializing database rules from protobuf: {}", source))]
ErrorDeserializingRulesProtobuf {
source: generated_types::database_rules::DecodeError,
},
#[snafu(display("id already set"))]
IdAlreadySet { id: ServerId },
#[snafu(display("unable to use server until id is set"))]
IdNotSet,
#[snafu(display(
"no database configuration present in directory that contains data: {:?}",
location
))]
NoDatabaseConfigError { location: object_store::path::Path },
#[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error },
#[snafu(display("Cannot create DB: {}", source))]
CreateDbError { source: Box<crate::Error> },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Default)]
pub struct CurrentServerId(OnceNonZeroU32);
impl CurrentServerId {
pub fn set(&self, id: ServerId) -> Result<()> {
self.0.set(id.get()).map_err(|id| Error::IdAlreadySet {
id: ServerId::new(id),
})
}
pub fn get(&self) -> Result<ServerId> {
self.0.get().map(ServerId::new).context(IdNotSet)
}
}
#[derive(Debug)]
pub struct InitStatus {
pub server_id: CurrentServerId,
/// Flags that databases are loaded and server is ready to read/write data.
initialized: AtomicBool,
/// Semaphore that limits the number of jobs that load DBs when the serverID is set.
///
/// Note that this semaphore is more of a "lock" than an arbitrary semaphore. All the other sync structures (mutex,
/// rwlock) require something to be wrapped which we don't have in our case, so we're using a semaphore here. We
/// want exactly 1 background worker to mess with the server init / DB loading, otherwise everything in the critical
/// section (in [`maybe_initialize_server`](Self::maybe_initialize_server)) will break apart. So this semaphore
/// cannot be configured.
initialize_semaphore: Semaphore,
}
impl InitStatus {
/// Create new "not initialized" status.
pub fn new() -> Self {
Self {
server_id: Default::default(),
initialized: AtomicBool::new(false),
// Always set semaphore permits to `1`, see design comments in `Server::initialize_semaphore`.
initialize_semaphore: Semaphore::new(1),
}
}
/// Base location in object store for this writer.
pub fn root_path(&self, store: &ObjectStore) -> Result<Path> {
let id = self.server_id.get()?;
let mut path = store.new_path();
path.push_dir(format!("{}", id));
Ok(path)
}
/// Check if server is loaded. Databases are loaded and server is ready to read/write.
pub fn initialized(&self) -> bool {
// ordering here isn't that important since this method is not used to check-and-modify the flag
self.initialized.load(Ordering::Relaxed)
}
/// Loads the database configurations based on the databases in the
/// object store. Any databases in the config already won't be
/// replaced.
///
/// This requires the serverID to be set. It will be a no-op if the configs are already loaded and the server is ready.
pub(crate) async fn maybe_initialize_server(
&self,
store: Arc<ObjectStore>,
config: Arc<Config>,
exec: Arc<Executor>,
) -> Result<()> {
let _guard = self
.initialize_semaphore
.acquire()
.await
.expect("semaphore should not be closed");
// Note that we use Acquire-Release ordering for the atomic within the semaphore to ensure that another thread
// that enters this semaphore after we've left actually sees the correct `is_ready` flag.
if self.initialized.load(Ordering::Acquire) {
// already loaded, so do nothing
return Ok(());
}
// get the database names from the object store prefixes
// TODO: update object store to pull back all common prefixes by
// following the next tokens.
let list_result = store
.list_with_delimiter(&self.root_path(&store)?)
.await
.context(StoreError)?;
let server_id = self.server_id.get()?;
let handles: Vec<_> = list_result
.common_prefixes
.into_iter()
.map(|mut path| {
let store = Arc::clone(&store);
let config = Arc::clone(&config);
let exec = Arc::clone(&exec);
path.set_file_name(DB_RULES_FILE_NAME);
tokio::task::spawn(async move {
if let Err(e) =
Self::load_database_config(server_id, store, config, exec, path).await
{
error!(%e, "cannot load database");
}
})
})
.collect();
futures::future::join_all(handles).await;
// mark as ready (use correct ordering for Acquire-Release)
self.initialized.store(true, Ordering::Release);
info!("loaded databases, server is initalized");
Ok(())
}
async fn load_database_config(
server_id: ServerId,
store: Arc<ObjectStore>,
config: Arc<Config>,
exec: Arc<Executor>,
path: Path,
) -> Result<()> {
let serialized_rules = loop {
match get_database_config_bytes(&path, &store).await {
Ok(data) => break data,
Err(e) => {
if let Error::NoDatabaseConfigError { location } = &e {
warn!(?location, "{}", e);
return Ok(());
}
error!(
"error getting database config {:?} from object store: {}",
path, e
);
tokio::time::sleep(tokio::time::Duration::from_secs(STORE_ERROR_PAUSE_SECONDS))
.await;
}
}
};
let rules = decode_database_rules(serialized_rules.freeze())
.context(ErrorDeserializingRulesProtobuf)?;
let preserved_catalog = load_or_create_preserved_catalog(
rules.db_name(),
Arc::clone(&store),
server_id,
config.metrics_registry(),
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)?;
let handle = config
.create_db(rules)
.map_err(Box::new)
.context(CreateDbError)?;
handle.commit(server_id, store, exec, preserved_catalog);
Ok(())
}
}
// get bytes from the location in object store
async fn get_store_bytes(
location: &object_store::path::Path,
store: &ObjectStore,
) -> Result<bytes::BytesMut> {
let b = store
.get(location)
.await
.context(StoreError)?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.context(StoreError)?;
Ok(b)
}
// get the bytes for the database rule config file, if it exists,
// otherwise it returns none.
async fn get_database_config_bytes(
location: &object_store::path::Path,
store: &ObjectStore,
) -> Result<bytes::BytesMut> {
let list_result = store
.list_with_delimiter(location)
.await
.context(StoreError)?;
if list_result.objects.is_empty() {
return NoDatabaseConfigError {
location: location.clone(),
}
.fail();
}
get_store_bytes(location, store).await
}
#[cfg(test)]
mod tests {
use object_store::{memory::InMemory, path::ObjectStorePath};
use super::*;
#[tokio::test]
async fn test_get_database_config_bytes() {
let object_store = ObjectStore::new_in_memory(InMemory::new());
let mut rules_path = object_store.new_path();
rules_path.push_all_dirs(&["1", "foo_bar"]);
rules_path.set_file_name("rules.pb");
let res = get_database_config_bytes(&rules_path, &object_store)
.await
.unwrap_err();
assert!(matches!(res, Error::NoDatabaseConfigError { .. }));
}
}

View File

@ -68,16 +68,13 @@
)] )]
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::BytesMut; use bytes::BytesMut;
use cached::proc_macro::cached; use cached::proc_macro::cached;
use db::load_or_create_preserved_catalog; use db::load_or_create_preserved_catalog;
use futures::stream::TryStreamExt; use init::InitStatus;
use object_store::path::Path;
use observability_deps::tracing::{debug, error, info, warn}; use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::Mutex; use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
@ -90,27 +87,24 @@ use data_types::{
}; };
use entry::{lines_to_sharded_entries, Entry, ShardedEntry}; use entry::{lines_to_sharded_entries, Entry, ShardedEntry};
use influxdb_line_protocol::ParsedLine; use influxdb_line_protocol::ParsedLine;
use internal_types::once::OnceNonZeroU32;
use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry}; use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry};
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; use object_store::{ObjectStore, ObjectStoreApi};
use query::{exec::Executor, DatabaseStore}; use query::{exec::Executor, DatabaseStore};
use tokio::sync::Semaphore;
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
pub use crate::config::RemoteTemplate; pub use crate::config::RemoteTemplate;
use crate::config::{ use crate::config::{object_store_path_for_database_config, Config, GRpcConnectionString};
object_store_path_for_database_config, Config, GRpcConnectionString, DB_RULES_FILE_NAME,
};
use cached::Return; use cached::Return;
use data_types::database_rules::{NodeGroup, RoutingRules, Shard, ShardConfig, ShardId}; use data_types::database_rules::{NodeGroup, RoutingRules, Shard, ShardConfig, ShardId};
pub use db::Db; pub use db::Db;
use generated_types::database_rules::{decode_database_rules, encode_database_rules}; use generated_types::database_rules::encode_database_rules;
use influxdb_iox_client::{connection::Builder, write}; use influxdb_iox_client::{connection::Builder, write};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use std::collections::HashMap; use std::collections::HashMap;
mod config; mod config;
pub mod db; pub mod db;
mod init;
mod write_buffer; mod write_buffer;
/// Utility modules used by benchmarks and tests /// Utility modules used by benchmarks and tests
@ -147,12 +141,6 @@ pub enum Error {
#[snafu(display("error replicating to remote: {}", source))] #[snafu(display("error replicating to remote: {}", source))]
ErrorReplicating { source: DatabaseError }, ErrorReplicating { source: DatabaseError },
#[snafu(display("id already set"))]
IdAlreadySet { id: ServerId },
#[snafu(display("unable to use server until id is set"))]
IdNotSet,
#[snafu(display( #[snafu(display(
"server ID is set but DBs are not yet loaded. Server is not yet ready to read/write data." "server ID is set but DBs are not yet loaded. Server is not yet ready to read/write data."
))] ))]
@ -163,23 +151,12 @@ pub enum Error {
source: generated_types::database_rules::EncodeError, source: generated_types::database_rules::EncodeError,
}, },
#[snafu(display("error deserializing database rules from protobuf: {}", source))]
ErrorDeserializingRulesProtobuf {
source: generated_types::database_rules::DecodeError,
},
#[snafu(display("error deserializing configuration {}", source))] #[snafu(display("error deserializing configuration {}", source))]
ErrorDeserializing { source: serde_json::Error }, ErrorDeserializing { source: serde_json::Error },
#[snafu(display("store error: {}", source))] #[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error }, StoreError { source: object_store::Error },
#[snafu(display(
"no database configuration present in directory that contains data: {:?}",
location
))]
NoDatabaseConfigError { location: object_store::path::Path },
#[snafu(display("database already exists"))] #[snafu(display("database already exists"))]
DatabaseAlreadyExists { db_name: String }, DatabaseAlreadyExists { db_name: String },
@ -210,6 +187,12 @@ pub enum Error {
#[snafu(display("cannot load catalog: {}", source))] #[snafu(display("cannot load catalog: {}", source))]
CatalogLoadError { source: DatabaseError }, CatalogLoadError { source: DatabaseError },
#[snafu(display("cannot set id: {}", source))]
SetIdError { source: crate::init::Error },
#[snafu(display("cannot get id: {}", source))]
GetIdError { source: crate::init::Error },
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -246,8 +229,6 @@ impl JobRegistry {
} }
} }
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
/// Used to configure a server instance /// Used to configure a server instance
#[derive(Debug)] #[derive(Debug)]
pub struct ServerConfig { pub struct ServerConfig {
@ -373,27 +354,11 @@ impl ServerMetrics {
} }
} }
#[derive(Debug, Default)]
struct CurrentServerId(OnceNonZeroU32);
impl CurrentServerId {
fn set(&self, id: ServerId) -> Result<()> {
self.0.set(id.get()).map_err(|id| Error::IdAlreadySet {
id: ServerId::new(id),
})
}
fn get(&self) -> Result<ServerId> {
self.0.get().map(ServerId::new).context(IdNotSet)
}
}
/// `Server` is the container struct for how servers store data internally, as /// `Server` is the container struct for how servers store data internally, as
/// well as how they communicate with other servers. Each server will have one /// well as how they communicate with other servers. Each server will have one
/// of these structs, which keeps track of all replication and query rules. /// of these structs, which keeps track of all replication and query rules.
#[derive(Debug)] #[derive(Debug)]
pub struct Server<M: ConnectionManager> { pub struct Server<M: ConnectionManager> {
id: CurrentServerId,
config: Arc<Config>, config: Arc<Config>,
connection_manager: Arc<M>, connection_manager: Arc<M>,
pub store: Arc<ObjectStore>, pub store: Arc<ObjectStore>,
@ -406,17 +371,7 @@ pub struct Server<M: ConnectionManager> {
/// and populates the endpoint with this data. /// and populates the endpoint with this data.
pub registry: Arc<metrics::MetricRegistry>, pub registry: Arc<metrics::MetricRegistry>,
/// Flags that databases are loaded and server is ready to read/write data. init_status: InitStatus,
initialized: AtomicBool,
/// Semaphore that limits the number of jobs that load DBs when the serverID is set.
///
/// Note that this semaphore is more of a "lock" than an arbitrary semaphore. All the other sync structures (mutex,
/// rwlock) require something to be wrapped which we don't have in our case, so we're using a semaphore here. We
/// want exactly 1 background worker to mess with the server init / DB loading, otherwise everything in the critical
/// section (in [`maybe_initialize_server`](Self::maybe_initialize_server)) will break apart. So this semaphore
/// cannot be configured.
initialize_semaphore: Semaphore,
} }
#[derive(Debug)] #[derive(Debug)]
@ -445,7 +400,6 @@ impl<M: ConnectionManager> Server<M> {
let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get); let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
Self { Self {
id: Default::default(),
config: Arc::new(Config::new( config: Arc::new(Config::new(
Arc::clone(&jobs), Arc::clone(&jobs),
Arc::clone(&metric_registry), Arc::clone(&metric_registry),
@ -457,9 +411,7 @@ impl<M: ConnectionManager> Server<M> {
jobs, jobs,
metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))), metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))),
registry: Arc::clone(&metric_registry), registry: Arc::clone(&metric_registry),
initialized: AtomicBool::new(false), init_status: InitStatus::new(),
// Always set semaphore permits to `1`, see design comments in `Server::initialize_semaphore`.
initialize_semaphore: Semaphore::new(1),
} }
} }
@ -468,18 +420,17 @@ impl<M: ConnectionManager> Server<M> {
/// ///
/// A valid server ID Must be non-zero. /// A valid server ID Must be non-zero.
pub fn set_id(&self, id: ServerId) -> Result<()> { pub fn set_id(&self, id: ServerId) -> Result<()> {
self.id.set(id) self.init_status.server_id.set(id).context(SetIdError)
} }
/// Returns the current server ID, or an error if not yet set. /// Returns the current server ID, or an error if not yet set.
pub fn require_id(&self) -> Result<ServerId> { pub fn require_id(&self) -> Result<ServerId> {
self.id.get() self.init_status.server_id.get().context(GetIdError)
} }
/// Check if server is loaded. Databases are loaded and server is ready to read/write. /// Check if server is loaded. Databases are loaded and server is ready to read/write.
pub fn initialized(&self) -> bool { pub fn initialized(&self) -> bool {
// ordering here isn't that important since this method is not used to check-and-modify the flag self.init_status.initialized()
self.initialized.load(Ordering::Relaxed)
} }
/// Require that server is loaded. Databases are loaded and server is ready to read/write. /// Require that server is loaded. Databases are loaded and server is ready to read/write.
@ -524,7 +475,13 @@ impl<M: ConnectionManager> Server<M> {
} }
pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> { pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> {
let location = object_store_path_for_database_config(&self.root_path()?, &rules.name); let location = object_store_path_for_database_config(
&self
.init_status
.root_path(&self.store)
.context(GetIdError)?,
&rules.name,
);
let mut data = BytesMut::new(); let mut data = BytesMut::new();
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?; encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
@ -543,115 +500,23 @@ impl<M: ConnectionManager> Server<M> {
Ok(()) Ok(())
} }
// base location in object store for this writer
fn root_path(&self) -> Result<object_store::path::Path> {
let id = self.require_id()?;
let mut path = self.store.new_path();
path.push_dir(format!("{}", id));
Ok(path)
}
/// Loads the database configurations based on the databases in the /// Loads the database configurations based on the databases in the
/// object store. Any databases in the config already won't be /// object store. Any databases in the config already won't be
/// replaced. /// replaced.
/// ///
/// This requires the serverID to be set. It will be a no-op if the configs are already loaded and the server is ready. /// This requires the serverID to be set. It will be a no-op if the configs are already loaded and the server is ready.
pub async fn maybe_initialize_server(&self) -> Result<()> { pub async fn maybe_initialize_server(&self) {
let _guard = self if let Err(e) = self
.initialize_semaphore .init_status
.acquire() .maybe_initialize_server(
Arc::clone(&self.store),
Arc::clone(&self.config),
Arc::clone(&self.exec),
)
.await .await
.expect("semaphore should not be closed"); {
error!(%e, "error during DB loading");
// Note that we use Acquire-Release ordering for the atomic within the semaphore to ensure that another thread
// that enters this semaphore after we've left actually sees the correct `is_ready` flag.
if self.initialized.load(Ordering::Acquire) {
// already loaded, so do nothing
return Ok(());
} }
// get the database names from the object store prefixes
// TODO: update object store to pull back all common prefixes by
// following the next tokens.
let list_result = self
.store
.list_with_delimiter(&self.root_path()?)
.await
.context(StoreError)?;
let server_id = self.require_id()?;
let handles: Vec<_> = list_result
.common_prefixes
.into_iter()
.map(|mut path| {
let store = Arc::clone(&self.store);
let config = Arc::clone(&self.config);
let exec = Arc::clone(&self.exec);
path.set_file_name(DB_RULES_FILE_NAME);
tokio::task::spawn(async move {
if let Err(e) =
Self::load_database_config(server_id, store, config, exec, path).await
{
error!(%e, "cannot load database");
}
})
})
.collect();
futures::future::join_all(handles).await;
// mark as ready (use correct ordering for Acquire-Release)
self.initialized.store(true, Ordering::Release);
info!("loaded databases, server is initalized");
Ok(())
}
async fn load_database_config(
server_id: ServerId,
store: Arc<ObjectStore>,
config: Arc<Config>,
exec: Arc<Executor>,
path: Path,
) -> Result<()> {
let serialized_rules = loop {
match get_database_config_bytes(&path, &store).await {
Ok(data) => break data,
Err(e) => {
if let Error::NoDatabaseConfigError { location } = &e {
warn!(?location, "{}", e);
return Ok(());
}
error!(
"error getting database config {:?} from object store: {}",
path, e
);
tokio::time::sleep(tokio::time::Duration::from_secs(STORE_ERROR_PAUSE_SECONDS))
.await;
}
}
};
let rules = decode_database_rules(serialized_rules.freeze())
.context(ErrorDeserializingRulesProtobuf)?;
let preserved_catalog = load_or_create_preserved_catalog(
rules.db_name(),
Arc::clone(&store),
server_id,
config.metrics_registry(),
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)?;
let handle = config.create_db(rules)?;
handle.commit(server_id, store, exec, preserved_catalog);
Ok(())
} }
/// `write_lines` takes in raw line protocol and converts it to a collection /// `write_lines` takes in raw line protocol and converts it to a collection
@ -940,9 +805,7 @@ impl<M: ConnectionManager> Server<M> {
while !shutdown.is_cancelled() { while !shutdown.is_cancelled() {
if self.require_id().is_ok() { if self.require_id().is_ok() {
if let Err(e) = self.maybe_initialize_server().await { self.maybe_initialize_server().await;
error!(%e, "error during DB loading");
}
} }
self.jobs.inner.lock().reclaim(); self.jobs.inner.lock().reclaim();
@ -1104,42 +967,6 @@ impl RemoteServer for RemoteServerImpl {
} }
} }
// get bytes from the location in object store
async fn get_store_bytes(
location: &object_store::path::Path,
store: &ObjectStore,
) -> Result<bytes::BytesMut> {
let b = store
.get(location)
.await
.context(StoreError)?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.context(StoreError)?;
Ok(b)
}
// get the bytes for the database rule config file, if it exists,
// otherwise it returns none.
async fn get_database_config_bytes(
location: &object_store::path::Path,
store: &ObjectStore,
) -> Result<bytes::BytesMut> {
let list_result = store
.list_with_delimiter(location)
.await
.context(StoreError)?;
if list_result.objects.is_empty() {
return NoDatabaseConfigError {
location: location.clone(),
}
.fail();
}
get_store_bytes(location, store).await
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{
@ -1150,6 +977,7 @@ mod tests {
use async_trait::async_trait; use async_trait::async_trait;
use futures::TryStreamExt; use futures::TryStreamExt;
use generated_types::database_rules::decode_database_rules;
use snafu::Snafu; use snafu::Snafu;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -1193,33 +1021,30 @@ mod tests {
config_with_metric_registry_and_store(object_store).1 config_with_metric_registry_and_store(object_store).1
} }
#[tokio::test]
async fn test_get_database_config_bytes() {
let object_store = ObjectStore::new_in_memory(InMemory::new());
let mut rules_path = object_store.new_path();
rules_path.push_all_dirs(&["1", "foo_bar"]);
rules_path.set_file_name("rules.pb");
let res = get_database_config_bytes(&rules_path, &object_store)
.await
.unwrap_err();
assert!(matches!(res, Error::NoDatabaseConfigError { .. }));
}
#[tokio::test] #[tokio::test]
async fn server_api_calls_return_error_with_no_id_set() { async fn server_api_calls_return_error_with_no_id_set() {
let manager = TestConnectionManager::new(); let manager = TestConnectionManager::new();
let server = Server::new(manager, config()); let server = Server::new(manager, config());
let resp = server.require_id().unwrap_err(); let resp = server.require_id().unwrap_err();
assert!(matches!(resp, Error::IdNotSet)); assert!(matches!(
resp,
Error::GetIdError {
source: crate::init::Error::IdNotSet
}
));
let lines = parsed_lines("cpu foo=1 10"); let lines = parsed_lines("cpu foo=1 10");
let resp = server let resp = server
.write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME) .write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME)
.await .await
.unwrap_err(); .unwrap_err();
assert!(matches!(resp, Error::IdNotSet)); assert!(matches!(
resp,
Error::GetIdError {
source: crate::init::Error::IdNotSet
}
));
} }
#[tokio::test] #[tokio::test]
@ -1229,7 +1054,7 @@ mod tests {
let store = config.store(); let store = config.store();
let server = Server::new(manager, config); let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.unwrap(); server.maybe_initialize_server().await;
let name = DatabaseName::new("bananas").unwrap(); let name = DatabaseName::new("bananas").unwrap();
@ -1282,7 +1107,7 @@ mod tests {
.with_num_worker_threads(1); .with_num_worker_threads(1);
let server2 = Server::new(manager, config2); let server2 = Server::new(manager, config2);
server2.set_id(ServerId::try_from(1).unwrap()).unwrap(); server2.set_id(ServerId::try_from(1).unwrap()).unwrap();
server2.maybe_initialize_server().await.unwrap(); server2.maybe_initialize_server().await;
let _ = server2.db(&db2).unwrap(); let _ = server2.db(&db2).unwrap();
let _ = server2.db(&name).unwrap(); let _ = server2.db(&name).unwrap();
@ -1295,7 +1120,7 @@ mod tests {
let manager = TestConnectionManager::new(); let manager = TestConnectionManager::new();
let server = Server::new(manager, config()); let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.unwrap(); server.maybe_initialize_server().await;
let name = DatabaseName::new("bananas").unwrap(); let name = DatabaseName::new("bananas").unwrap();
@ -1346,7 +1171,7 @@ mod tests {
let config = config_with_store(store); let config = config_with_store(store);
let server = Server::new(manager, config); let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.unwrap(); server.maybe_initialize_server().await;
create_simple_database(&server, "bananas") create_simple_database(&server, "bananas")
.await .await
.expect("failed to create database"); .expect("failed to create database");
@ -1362,7 +1187,7 @@ mod tests {
let config = config_with_store(store); let config = config_with_store(store);
let server = Server::new(manager, config); let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.expect("load config"); server.maybe_initialize_server().await;
create_simple_database(&server, "apples") create_simple_database(&server, "apples")
.await .await
@ -1382,7 +1207,7 @@ mod tests {
let config = config_with_store(store); let config = config_with_store(store);
let server = Server::new(manager, config); let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.expect("load config"); server.maybe_initialize_server().await;
assert_eq!(server.db_names_sorted(), vec!["apples"]); assert_eq!(server.db_names_sorted(), vec!["apples"]);
} }
@ -1392,7 +1217,7 @@ mod tests {
let manager = TestConnectionManager::new(); let manager = TestConnectionManager::new();
let server = Server::new(manager, config()); let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.expect("load config"); server.maybe_initialize_server().await;
let names = vec!["bar", "baz"]; let names = vec!["bar", "baz"];
@ -1413,7 +1238,7 @@ mod tests {
let manager = TestConnectionManager::new(); let manager = TestConnectionManager::new();
let server = Server::new(manager, config()); let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.unwrap(); server.maybe_initialize_server().await;
let name = DatabaseName::new("foo".to_string()).unwrap(); let name = DatabaseName::new("foo".to_string()).unwrap();
server server
@ -1454,7 +1279,7 @@ mod tests {
let manager = TestConnectionManager::new(); let manager = TestConnectionManager::new();
let server = Server::new(manager, config); let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.unwrap(); server.maybe_initialize_server().await;
let name = DatabaseName::new("foo".to_string()).unwrap(); let name = DatabaseName::new("foo".to_string()).unwrap();
server server
@ -1541,7 +1366,7 @@ mod tests {
let server = Server::new(manager, config()); let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.unwrap(); server.maybe_initialize_server().await;
let db_name = DatabaseName::new("foo").unwrap(); let db_name = DatabaseName::new("foo").unwrap();
server server
@ -1622,7 +1447,7 @@ mod tests {
let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone()); let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.unwrap(); server.maybe_initialize_server().await;
let db_name = DatabaseName::new("foo").unwrap(); let db_name = DatabaseName::new("foo").unwrap();
server server
@ -1776,7 +1601,7 @@ mod tests {
let manager = TestConnectionManager::new(); let manager = TestConnectionManager::new();
let server = Server::new(manager, config()); let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_initialize_server().await.unwrap(); server.maybe_initialize_server().await;
let name = DatabaseName::new("foo".to_string()).unwrap(); let name = DatabaseName::new("foo".to_string()).unwrap();
server server
@ -1834,7 +1659,12 @@ mod tests {
let err = create_simple_database(&server, "bananas") let err = create_simple_database(&server, "bananas")
.await .await
.unwrap_err(); .unwrap_err();
assert!(matches!(err, Error::IdNotSet)); assert!(matches!(
err,
Error::GetIdError {
source: crate::init::Error::IdNotSet
}
));
server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.set_id(ServerId::try_from(1).unwrap()).unwrap();
// do NOT call `server.maybe_load_database_configs` so DBs are not loaded and server is not ready // do NOT call `server.maybe_load_database_configs` so DBs are not loaded and server is not ready

View File

@ -899,7 +899,7 @@ mod tests {
let (_, config) = config(); let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await.unwrap(); app_server.maybe_initialize_server().await;
app_server app_server
.create_database(DatabaseRules::new( .create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(), DatabaseName::new("MyOrg_MyBucket").unwrap(),
@ -947,7 +947,7 @@ mod tests {
let (metrics_registry, config) = config(); let (metrics_registry, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await.unwrap(); app_server.maybe_initialize_server().await;
app_server app_server
.create_database(DatabaseRules::new( .create_database(DatabaseRules::new(
DatabaseName::new("MetricsOrg_MetricsBucket").unwrap(), DatabaseName::new("MetricsOrg_MetricsBucket").unwrap(),
@ -1037,7 +1037,7 @@ mod tests {
let (_, config) = config(); let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await.unwrap(); app_server.maybe_initialize_server().await;
app_server app_server
.create_database(DatabaseRules::new( .create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(), DatabaseName::new("MyOrg_MyBucket").unwrap(),
@ -1174,7 +1174,7 @@ mod tests {
let (_, config) = config(); let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await.unwrap(); app_server.maybe_initialize_server().await;
app_server app_server
.create_database(DatabaseRules::new( .create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(), DatabaseName::new("MyOrg_MyBucket").unwrap(),
@ -1223,7 +1223,7 @@ mod tests {
let (_, config) = config(); let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await.unwrap(); app_server.maybe_initialize_server().await;
app_server app_server
.create_database(DatabaseRules::new( .create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(), DatabaseName::new("MyOrg_MyBucket").unwrap(),

View File

@ -8,7 +8,7 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
use server::Error; use server::Error;
match error { match error {
Error::IdNotSet => PreconditionViolation { Error::GetIdError { .. } => PreconditionViolation {
category: "Writer ID".to_string(), category: "Writer ID".to_string(),
subject: "influxdata.com/iox".to_string(), subject: "influxdata.com/iox".to_string(),
description: "Writer ID must be set".to_string(), description: "Writer ID must be set".to_string(),

View File

@ -71,7 +71,7 @@ where
match self.server.set_id(id) { match self.server.set_id(id) {
Ok(_) => Ok(Response::new(UpdateServerIdResponse {})), Ok(_) => Ok(Response::new(UpdateServerIdResponse {})),
Err(e @ Error::IdAlreadySet { .. }) => { Err(e @ Error::SetIdError { .. }) => {
return Err(FieldViolation { return Err(FieldViolation {
field: "id".to_string(), field: "id".to_string(),
description: e.to_string(), description: e.to_string(),