Merge pull request #1670 from influxdata/crepererum/extract_server_init
refactor: extract server init codepull/24376/head
commit
21f6f58e97
|
@ -0,0 +1,280 @@
|
|||
//! Routines to initialize a server.
|
||||
use data_types::server_id::ServerId;
|
||||
use futures::TryStreamExt;
|
||||
use generated_types::database_rules::decode_database_rules;
|
||||
use internal_types::once::OnceNonZeroU32;
|
||||
use object_store::{
|
||||
path::{ObjectStorePath, Path},
|
||||
ObjectStore, ObjectStoreApi,
|
||||
};
|
||||
use observability_deps::tracing::{error, info, warn};
|
||||
use query::exec::Executor;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::{
|
||||
config::{Config, DB_RULES_FILE_NAME},
|
||||
db::load_or_create_preserved_catalog,
|
||||
DatabaseError,
|
||||
};
|
||||
|
||||
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("cannot load catalog: {}", source))]
|
||||
CatalogLoadError { source: DatabaseError },
|
||||
|
||||
#[snafu(display("error deserializing database rules from protobuf: {}", source))]
|
||||
ErrorDeserializingRulesProtobuf {
|
||||
source: generated_types::database_rules::DecodeError,
|
||||
},
|
||||
|
||||
#[snafu(display("id already set"))]
|
||||
IdAlreadySet { id: ServerId },
|
||||
|
||||
#[snafu(display("unable to use server until id is set"))]
|
||||
IdNotSet,
|
||||
|
||||
#[snafu(display(
|
||||
"no database configuration present in directory that contains data: {:?}",
|
||||
location
|
||||
))]
|
||||
NoDatabaseConfigError { location: object_store::path::Path },
|
||||
|
||||
#[snafu(display("store error: {}", source))]
|
||||
StoreError { source: object_store::Error },
|
||||
|
||||
#[snafu(display("Cannot create DB: {}", source))]
|
||||
CreateDbError { source: Box<crate::Error> },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct CurrentServerId(OnceNonZeroU32);
|
||||
|
||||
impl CurrentServerId {
|
||||
pub fn set(&self, id: ServerId) -> Result<()> {
|
||||
self.0.set(id.get()).map_err(|id| Error::IdAlreadySet {
|
||||
id: ServerId::new(id),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get(&self) -> Result<ServerId> {
|
||||
self.0.get().map(ServerId::new).context(IdNotSet)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InitStatus {
|
||||
pub server_id: CurrentServerId,
|
||||
|
||||
/// Flags that databases are loaded and server is ready to read/write data.
|
||||
initialized: AtomicBool,
|
||||
|
||||
/// Semaphore that limits the number of jobs that load DBs when the serverID is set.
|
||||
///
|
||||
/// Note that this semaphore is more of a "lock" than an arbitrary semaphore. All the other sync structures (mutex,
|
||||
/// rwlock) require something to be wrapped which we don't have in our case, so we're using a semaphore here. We
|
||||
/// want exactly 1 background worker to mess with the server init / DB loading, otherwise everything in the critical
|
||||
/// section (in [`maybe_initialize_server`](Self::maybe_initialize_server)) will break apart. So this semaphore
|
||||
/// cannot be configured.
|
||||
initialize_semaphore: Semaphore,
|
||||
}
|
||||
|
||||
impl InitStatus {
|
||||
/// Create new "not initialized" status.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
server_id: Default::default(),
|
||||
initialized: AtomicBool::new(false),
|
||||
// Always set semaphore permits to `1`, see design comments in `Server::initialize_semaphore`.
|
||||
initialize_semaphore: Semaphore::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Base location in object store for this writer.
|
||||
pub fn root_path(&self, store: &ObjectStore) -> Result<Path> {
|
||||
let id = self.server_id.get()?;
|
||||
|
||||
let mut path = store.new_path();
|
||||
path.push_dir(format!("{}", id));
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
/// Check if server is loaded. Databases are loaded and server is ready to read/write.
|
||||
pub fn initialized(&self) -> bool {
|
||||
// ordering here isn't that important since this method is not used to check-and-modify the flag
|
||||
self.initialized.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Loads the database configurations based on the databases in the
|
||||
/// object store. Any databases in the config already won't be
|
||||
/// replaced.
|
||||
///
|
||||
/// This requires the serverID to be set. It will be a no-op if the configs are already loaded and the server is ready.
|
||||
pub(crate) async fn maybe_initialize_server(
|
||||
&self,
|
||||
store: Arc<ObjectStore>,
|
||||
config: Arc<Config>,
|
||||
exec: Arc<Executor>,
|
||||
) -> Result<()> {
|
||||
let _guard = self
|
||||
.initialize_semaphore
|
||||
.acquire()
|
||||
.await
|
||||
.expect("semaphore should not be closed");
|
||||
|
||||
// Note that we use Acquire-Release ordering for the atomic within the semaphore to ensure that another thread
|
||||
// that enters this semaphore after we've left actually sees the correct `is_ready` flag.
|
||||
if self.initialized.load(Ordering::Acquire) {
|
||||
// already loaded, so do nothing
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// get the database names from the object store prefixes
|
||||
// TODO: update object store to pull back all common prefixes by
|
||||
// following the next tokens.
|
||||
let list_result = store
|
||||
.list_with_delimiter(&self.root_path(&store)?)
|
||||
.await
|
||||
.context(StoreError)?;
|
||||
|
||||
let server_id = self.server_id.get()?;
|
||||
|
||||
let handles: Vec<_> = list_result
|
||||
.common_prefixes
|
||||
.into_iter()
|
||||
.map(|mut path| {
|
||||
let store = Arc::clone(&store);
|
||||
let config = Arc::clone(&config);
|
||||
let exec = Arc::clone(&exec);
|
||||
|
||||
path.set_file_name(DB_RULES_FILE_NAME);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(e) =
|
||||
Self::load_database_config(server_id, store, config, exec, path).await
|
||||
{
|
||||
error!(%e, "cannot load database");
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
futures::future::join_all(handles).await;
|
||||
|
||||
// mark as ready (use correct ordering for Acquire-Release)
|
||||
self.initialized.store(true, Ordering::Release);
|
||||
info!("loaded databases, server is initalized");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_database_config(
|
||||
server_id: ServerId,
|
||||
store: Arc<ObjectStore>,
|
||||
config: Arc<Config>,
|
||||
exec: Arc<Executor>,
|
||||
path: Path,
|
||||
) -> Result<()> {
|
||||
let serialized_rules = loop {
|
||||
match get_database_config_bytes(&path, &store).await {
|
||||
Ok(data) => break data,
|
||||
Err(e) => {
|
||||
if let Error::NoDatabaseConfigError { location } = &e {
|
||||
warn!(?location, "{}", e);
|
||||
return Ok(());
|
||||
}
|
||||
error!(
|
||||
"error getting database config {:?} from object store: {}",
|
||||
path, e
|
||||
);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(STORE_ERROR_PAUSE_SECONDS))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
};
|
||||
let rules = decode_database_rules(serialized_rules.freeze())
|
||||
.context(ErrorDeserializingRulesProtobuf)?;
|
||||
|
||||
let preserved_catalog = load_or_create_preserved_catalog(
|
||||
rules.db_name(),
|
||||
Arc::clone(&store),
|
||||
server_id,
|
||||
config.metrics_registry(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CatalogLoadError)?;
|
||||
|
||||
let handle = config
|
||||
.create_db(rules)
|
||||
.map_err(Box::new)
|
||||
.context(CreateDbError)?;
|
||||
handle.commit(server_id, store, exec, preserved_catalog);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// get bytes from the location in object store
|
||||
async fn get_store_bytes(
|
||||
location: &object_store::path::Path,
|
||||
store: &ObjectStore,
|
||||
) -> Result<bytes::BytesMut> {
|
||||
let b = store
|
||||
.get(location)
|
||||
.await
|
||||
.context(StoreError)?
|
||||
.map_ok(|b| bytes::BytesMut::from(&b[..]))
|
||||
.try_concat()
|
||||
.await
|
||||
.context(StoreError)?;
|
||||
|
||||
Ok(b)
|
||||
}
|
||||
|
||||
// get the bytes for the database rule config file, if it exists,
|
||||
// otherwise it returns none.
|
||||
async fn get_database_config_bytes(
|
||||
location: &object_store::path::Path,
|
||||
store: &ObjectStore,
|
||||
) -> Result<bytes::BytesMut> {
|
||||
let list_result = store
|
||||
.list_with_delimiter(location)
|
||||
.await
|
||||
.context(StoreError)?;
|
||||
if list_result.objects.is_empty() {
|
||||
return NoDatabaseConfigError {
|
||||
location: location.clone(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
get_store_bytes(location, store).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use object_store::{memory::InMemory, path::ObjectStorePath};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_database_config_bytes() {
|
||||
let object_store = ObjectStore::new_in_memory(InMemory::new());
|
||||
let mut rules_path = object_store.new_path();
|
||||
rules_path.push_all_dirs(&["1", "foo_bar"]);
|
||||
rules_path.set_file_name("rules.pb");
|
||||
|
||||
let res = get_database_config_bytes(&rules_path, &object_store)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(res, Error::NoDatabaseConfigError { .. }));
|
||||
}
|
||||
}
|
|
@ -68,16 +68,13 @@
|
|||
)]
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::BytesMut;
|
||||
use cached::proc_macro::cached;
|
||||
use db::load_or_create_preserved_catalog;
|
||||
use futures::stream::TryStreamExt;
|
||||
use object_store::path::Path;
|
||||
use init::InitStatus;
|
||||
use observability_deps::tracing::{debug, error, info, warn};
|
||||
use parking_lot::Mutex;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
@ -90,27 +87,24 @@ use data_types::{
|
|||
};
|
||||
use entry::{lines_to_sharded_entries, Entry, ShardedEntry};
|
||||
use influxdb_line_protocol::ParsedLine;
|
||||
use internal_types::once::OnceNonZeroU32;
|
||||
use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry};
|
||||
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
||||
use object_store::{ObjectStore, ObjectStoreApi};
|
||||
use query::{exec::Executor, DatabaseStore};
|
||||
use tokio::sync::Semaphore;
|
||||
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
|
||||
|
||||
pub use crate::config::RemoteTemplate;
|
||||
use crate::config::{
|
||||
object_store_path_for_database_config, Config, GRpcConnectionString, DB_RULES_FILE_NAME,
|
||||
};
|
||||
use crate::config::{object_store_path_for_database_config, Config, GRpcConnectionString};
|
||||
use cached::Return;
|
||||
use data_types::database_rules::{NodeGroup, RoutingRules, Shard, ShardConfig, ShardId};
|
||||
pub use db::Db;
|
||||
use generated_types::database_rules::{decode_database_rules, encode_database_rules};
|
||||
use generated_types::database_rules::encode_database_rules;
|
||||
use influxdb_iox_client::{connection::Builder, write};
|
||||
use rand::seq::SliceRandom;
|
||||
use std::collections::HashMap;
|
||||
|
||||
mod config;
|
||||
pub mod db;
|
||||
mod init;
|
||||
mod write_buffer;
|
||||
|
||||
/// Utility modules used by benchmarks and tests
|
||||
|
@ -147,12 +141,6 @@ pub enum Error {
|
|||
#[snafu(display("error replicating to remote: {}", source))]
|
||||
ErrorReplicating { source: DatabaseError },
|
||||
|
||||
#[snafu(display("id already set"))]
|
||||
IdAlreadySet { id: ServerId },
|
||||
|
||||
#[snafu(display("unable to use server until id is set"))]
|
||||
IdNotSet,
|
||||
|
||||
#[snafu(display(
|
||||
"server ID is set but DBs are not yet loaded. Server is not yet ready to read/write data."
|
||||
))]
|
||||
|
@ -163,23 +151,12 @@ pub enum Error {
|
|||
source: generated_types::database_rules::EncodeError,
|
||||
},
|
||||
|
||||
#[snafu(display("error deserializing database rules from protobuf: {}", source))]
|
||||
ErrorDeserializingRulesProtobuf {
|
||||
source: generated_types::database_rules::DecodeError,
|
||||
},
|
||||
|
||||
#[snafu(display("error deserializing configuration {}", source))]
|
||||
ErrorDeserializing { source: serde_json::Error },
|
||||
|
||||
#[snafu(display("store error: {}", source))]
|
||||
StoreError { source: object_store::Error },
|
||||
|
||||
#[snafu(display(
|
||||
"no database configuration present in directory that contains data: {:?}",
|
||||
location
|
||||
))]
|
||||
NoDatabaseConfigError { location: object_store::path::Path },
|
||||
|
||||
#[snafu(display("database already exists"))]
|
||||
DatabaseAlreadyExists { db_name: String },
|
||||
|
||||
|
@ -210,6 +187,12 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("cannot load catalog: {}", source))]
|
||||
CatalogLoadError { source: DatabaseError },
|
||||
|
||||
#[snafu(display("cannot set id: {}", source))]
|
||||
SetIdError { source: crate::init::Error },
|
||||
|
||||
#[snafu(display("cannot get id: {}", source))]
|
||||
GetIdError { source: crate::init::Error },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -246,8 +229,6 @@ impl JobRegistry {
|
|||
}
|
||||
}
|
||||
|
||||
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
|
||||
|
||||
/// Used to configure a server instance
|
||||
#[derive(Debug)]
|
||||
pub struct ServerConfig {
|
||||
|
@ -373,27 +354,11 @@ impl ServerMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct CurrentServerId(OnceNonZeroU32);
|
||||
|
||||
impl CurrentServerId {
|
||||
fn set(&self, id: ServerId) -> Result<()> {
|
||||
self.0.set(id.get()).map_err(|id| Error::IdAlreadySet {
|
||||
id: ServerId::new(id),
|
||||
})
|
||||
}
|
||||
|
||||
fn get(&self) -> Result<ServerId> {
|
||||
self.0.get().map(ServerId::new).context(IdNotSet)
|
||||
}
|
||||
}
|
||||
|
||||
/// `Server` is the container struct for how servers store data internally, as
|
||||
/// well as how they communicate with other servers. Each server will have one
|
||||
/// of these structs, which keeps track of all replication and query rules.
|
||||
#[derive(Debug)]
|
||||
pub struct Server<M: ConnectionManager> {
|
||||
id: CurrentServerId,
|
||||
config: Arc<Config>,
|
||||
connection_manager: Arc<M>,
|
||||
pub store: Arc<ObjectStore>,
|
||||
|
@ -406,17 +371,7 @@ pub struct Server<M: ConnectionManager> {
|
|||
/// and populates the endpoint with this data.
|
||||
pub registry: Arc<metrics::MetricRegistry>,
|
||||
|
||||
/// Flags that databases are loaded and server is ready to read/write data.
|
||||
initialized: AtomicBool,
|
||||
|
||||
/// Semaphore that limits the number of jobs that load DBs when the serverID is set.
|
||||
///
|
||||
/// Note that this semaphore is more of a "lock" than an arbitrary semaphore. All the other sync structures (mutex,
|
||||
/// rwlock) require something to be wrapped which we don't have in our case, so we're using a semaphore here. We
|
||||
/// want exactly 1 background worker to mess with the server init / DB loading, otherwise everything in the critical
|
||||
/// section (in [`maybe_initialize_server`](Self::maybe_initialize_server)) will break apart. So this semaphore
|
||||
/// cannot be configured.
|
||||
initialize_semaphore: Semaphore,
|
||||
init_status: InitStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -445,7 +400,6 @@ impl<M: ConnectionManager> Server<M> {
|
|||
let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
|
||||
|
||||
Self {
|
||||
id: Default::default(),
|
||||
config: Arc::new(Config::new(
|
||||
Arc::clone(&jobs),
|
||||
Arc::clone(&metric_registry),
|
||||
|
@ -457,9 +411,7 @@ impl<M: ConnectionManager> Server<M> {
|
|||
jobs,
|
||||
metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))),
|
||||
registry: Arc::clone(&metric_registry),
|
||||
initialized: AtomicBool::new(false),
|
||||
// Always set semaphore permits to `1`, see design comments in `Server::initialize_semaphore`.
|
||||
initialize_semaphore: Semaphore::new(1),
|
||||
init_status: InitStatus::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -468,18 +420,17 @@ impl<M: ConnectionManager> Server<M> {
|
|||
///
|
||||
/// A valid server ID Must be non-zero.
|
||||
pub fn set_id(&self, id: ServerId) -> Result<()> {
|
||||
self.id.set(id)
|
||||
self.init_status.server_id.set(id).context(SetIdError)
|
||||
}
|
||||
|
||||
/// Returns the current server ID, or an error if not yet set.
|
||||
pub fn require_id(&self) -> Result<ServerId> {
|
||||
self.id.get()
|
||||
self.init_status.server_id.get().context(GetIdError)
|
||||
}
|
||||
|
||||
/// Check if server is loaded. Databases are loaded and server is ready to read/write.
|
||||
pub fn initialized(&self) -> bool {
|
||||
// ordering here isn't that important since this method is not used to check-and-modify the flag
|
||||
self.initialized.load(Ordering::Relaxed)
|
||||
self.init_status.initialized()
|
||||
}
|
||||
|
||||
/// Require that server is loaded. Databases are loaded and server is ready to read/write.
|
||||
|
@ -524,7 +475,13 @@ impl<M: ConnectionManager> Server<M> {
|
|||
}
|
||||
|
||||
pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> {
|
||||
let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
|
||||
let location = object_store_path_for_database_config(
|
||||
&self
|
||||
.init_status
|
||||
.root_path(&self.store)
|
||||
.context(GetIdError)?,
|
||||
&rules.name,
|
||||
);
|
||||
|
||||
let mut data = BytesMut::new();
|
||||
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
|
||||
|
@ -543,115 +500,23 @@ impl<M: ConnectionManager> Server<M> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
// base location in object store for this writer
|
||||
fn root_path(&self) -> Result<object_store::path::Path> {
|
||||
let id = self.require_id()?;
|
||||
|
||||
let mut path = self.store.new_path();
|
||||
path.push_dir(format!("{}", id));
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
/// Loads the database configurations based on the databases in the
|
||||
/// object store. Any databases in the config already won't be
|
||||
/// replaced.
|
||||
///
|
||||
/// This requires the serverID to be set. It will be a no-op if the configs are already loaded and the server is ready.
|
||||
pub async fn maybe_initialize_server(&self) -> Result<()> {
|
||||
let _guard = self
|
||||
.initialize_semaphore
|
||||
.acquire()
|
||||
pub async fn maybe_initialize_server(&self) {
|
||||
if let Err(e) = self
|
||||
.init_status
|
||||
.maybe_initialize_server(
|
||||
Arc::clone(&self.store),
|
||||
Arc::clone(&self.config),
|
||||
Arc::clone(&self.exec),
|
||||
)
|
||||
.await
|
||||
.expect("semaphore should not be closed");
|
||||
|
||||
// Note that we use Acquire-Release ordering for the atomic within the semaphore to ensure that another thread
|
||||
// that enters this semaphore after we've left actually sees the correct `is_ready` flag.
|
||||
if self.initialized.load(Ordering::Acquire) {
|
||||
// already loaded, so do nothing
|
||||
return Ok(());
|
||||
{
|
||||
error!(%e, "error during DB loading");
|
||||
}
|
||||
|
||||
// get the database names from the object store prefixes
|
||||
// TODO: update object store to pull back all common prefixes by
|
||||
// following the next tokens.
|
||||
let list_result = self
|
||||
.store
|
||||
.list_with_delimiter(&self.root_path()?)
|
||||
.await
|
||||
.context(StoreError)?;
|
||||
|
||||
let server_id = self.require_id()?;
|
||||
|
||||
let handles: Vec<_> = list_result
|
||||
.common_prefixes
|
||||
.into_iter()
|
||||
.map(|mut path| {
|
||||
let store = Arc::clone(&self.store);
|
||||
let config = Arc::clone(&self.config);
|
||||
let exec = Arc::clone(&self.exec);
|
||||
|
||||
path.set_file_name(DB_RULES_FILE_NAME);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(e) =
|
||||
Self::load_database_config(server_id, store, config, exec, path).await
|
||||
{
|
||||
error!(%e, "cannot load database");
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
futures::future::join_all(handles).await;
|
||||
|
||||
// mark as ready (use correct ordering for Acquire-Release)
|
||||
self.initialized.store(true, Ordering::Release);
|
||||
info!("loaded databases, server is initalized");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_database_config(
|
||||
server_id: ServerId,
|
||||
store: Arc<ObjectStore>,
|
||||
config: Arc<Config>,
|
||||
exec: Arc<Executor>,
|
||||
path: Path,
|
||||
) -> Result<()> {
|
||||
let serialized_rules = loop {
|
||||
match get_database_config_bytes(&path, &store).await {
|
||||
Ok(data) => break data,
|
||||
Err(e) => {
|
||||
if let Error::NoDatabaseConfigError { location } = &e {
|
||||
warn!(?location, "{}", e);
|
||||
return Ok(());
|
||||
}
|
||||
error!(
|
||||
"error getting database config {:?} from object store: {}",
|
||||
path, e
|
||||
);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(STORE_ERROR_PAUSE_SECONDS))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
};
|
||||
let rules = decode_database_rules(serialized_rules.freeze())
|
||||
.context(ErrorDeserializingRulesProtobuf)?;
|
||||
|
||||
let preserved_catalog = load_or_create_preserved_catalog(
|
||||
rules.db_name(),
|
||||
Arc::clone(&store),
|
||||
server_id,
|
||||
config.metrics_registry(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CatalogLoadError)?;
|
||||
|
||||
let handle = config.create_db(rules)?;
|
||||
handle.commit(server_id, store, exec, preserved_catalog);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `write_lines` takes in raw line protocol and converts it to a collection
|
||||
|
@ -940,9 +805,7 @@ impl<M: ConnectionManager> Server<M> {
|
|||
|
||||
while !shutdown.is_cancelled() {
|
||||
if self.require_id().is_ok() {
|
||||
if let Err(e) = self.maybe_initialize_server().await {
|
||||
error!(%e, "error during DB loading");
|
||||
}
|
||||
self.maybe_initialize_server().await;
|
||||
}
|
||||
|
||||
self.jobs.inner.lock().reclaim();
|
||||
|
@ -1104,42 +967,6 @@ impl RemoteServer for RemoteServerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
// get bytes from the location in object store
|
||||
async fn get_store_bytes(
|
||||
location: &object_store::path::Path,
|
||||
store: &ObjectStore,
|
||||
) -> Result<bytes::BytesMut> {
|
||||
let b = store
|
||||
.get(location)
|
||||
.await
|
||||
.context(StoreError)?
|
||||
.map_ok(|b| bytes::BytesMut::from(&b[..]))
|
||||
.try_concat()
|
||||
.await
|
||||
.context(StoreError)?;
|
||||
|
||||
Ok(b)
|
||||
}
|
||||
|
||||
// get the bytes for the database rule config file, if it exists,
|
||||
// otherwise it returns none.
|
||||
async fn get_database_config_bytes(
|
||||
location: &object_store::path::Path,
|
||||
store: &ObjectStore,
|
||||
) -> Result<bytes::BytesMut> {
|
||||
let list_result = store
|
||||
.list_with_delimiter(location)
|
||||
.await
|
||||
.context(StoreError)?;
|
||||
if list_result.objects.is_empty() {
|
||||
return NoDatabaseConfigError {
|
||||
location: location.clone(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
get_store_bytes(location, store).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
|
@ -1150,6 +977,7 @@ mod tests {
|
|||
|
||||
use async_trait::async_trait;
|
||||
use futures::TryStreamExt;
|
||||
use generated_types::database_rules::decode_database_rules;
|
||||
use snafu::Snafu;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
@ -1193,33 +1021,30 @@ mod tests {
|
|||
config_with_metric_registry_and_store(object_store).1
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_database_config_bytes() {
|
||||
let object_store = ObjectStore::new_in_memory(InMemory::new());
|
||||
let mut rules_path = object_store.new_path();
|
||||
rules_path.push_all_dirs(&["1", "foo_bar"]);
|
||||
rules_path.set_file_name("rules.pb");
|
||||
|
||||
let res = get_database_config_bytes(&rules_path, &object_store)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(res, Error::NoDatabaseConfigError { .. }));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn server_api_calls_return_error_with_no_id_set() {
|
||||
let manager = TestConnectionManager::new();
|
||||
let server = Server::new(manager, config());
|
||||
|
||||
let resp = server.require_id().unwrap_err();
|
||||
assert!(matches!(resp, Error::IdNotSet));
|
||||
assert!(matches!(
|
||||
resp,
|
||||
Error::GetIdError {
|
||||
source: crate::init::Error::IdNotSet
|
||||
}
|
||||
));
|
||||
|
||||
let lines = parsed_lines("cpu foo=1 10");
|
||||
let resp = server
|
||||
.write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(resp, Error::IdNotSet));
|
||||
assert!(matches!(
|
||||
resp,
|
||||
Error::GetIdError {
|
||||
source: crate::init::Error::IdNotSet
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1229,7 +1054,7 @@ mod tests {
|
|||
let store = config.store();
|
||||
let server = Server::new(manager, config);
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
let name = DatabaseName::new("bananas").unwrap();
|
||||
|
||||
|
@ -1282,7 +1107,7 @@ mod tests {
|
|||
.with_num_worker_threads(1);
|
||||
let server2 = Server::new(manager, config2);
|
||||
server2.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server2.maybe_initialize_server().await.unwrap();
|
||||
server2.maybe_initialize_server().await;
|
||||
|
||||
let _ = server2.db(&db2).unwrap();
|
||||
let _ = server2.db(&name).unwrap();
|
||||
|
@ -1295,7 +1120,7 @@ mod tests {
|
|||
let manager = TestConnectionManager::new();
|
||||
let server = Server::new(manager, config());
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
let name = DatabaseName::new("bananas").unwrap();
|
||||
|
||||
|
@ -1346,7 +1171,7 @@ mod tests {
|
|||
let config = config_with_store(store);
|
||||
let server = Server::new(manager, config);
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
create_simple_database(&server, "bananas")
|
||||
.await
|
||||
.expect("failed to create database");
|
||||
|
@ -1362,7 +1187,7 @@ mod tests {
|
|||
let config = config_with_store(store);
|
||||
let server = Server::new(manager, config);
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.expect("load config");
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
create_simple_database(&server, "apples")
|
||||
.await
|
||||
|
@ -1382,7 +1207,7 @@ mod tests {
|
|||
let config = config_with_store(store);
|
||||
let server = Server::new(manager, config);
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.expect("load config");
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
assert_eq!(server.db_names_sorted(), vec!["apples"]);
|
||||
}
|
||||
|
@ -1392,7 +1217,7 @@ mod tests {
|
|||
let manager = TestConnectionManager::new();
|
||||
let server = Server::new(manager, config());
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.expect("load config");
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
let names = vec!["bar", "baz"];
|
||||
|
||||
|
@ -1413,7 +1238,7 @@ mod tests {
|
|||
let manager = TestConnectionManager::new();
|
||||
let server = Server::new(manager, config());
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
let name = DatabaseName::new("foo".to_string()).unwrap();
|
||||
server
|
||||
|
@ -1454,7 +1279,7 @@ mod tests {
|
|||
let manager = TestConnectionManager::new();
|
||||
let server = Server::new(manager, config);
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
let name = DatabaseName::new("foo".to_string()).unwrap();
|
||||
server
|
||||
|
@ -1541,7 +1366,7 @@ mod tests {
|
|||
|
||||
let server = Server::new(manager, config());
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
server
|
||||
|
@ -1622,7 +1447,7 @@ mod tests {
|
|||
let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone());
|
||||
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
server
|
||||
|
@ -1776,7 +1601,7 @@ mod tests {
|
|||
let manager = TestConnectionManager::new();
|
||||
let server = Server::new(manager, config());
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await.unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
let name = DatabaseName::new("foo".to_string()).unwrap();
|
||||
server
|
||||
|
@ -1834,7 +1659,12 @@ mod tests {
|
|||
let err = create_simple_database(&server, "bananas")
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::IdNotSet));
|
||||
assert!(matches!(
|
||||
err,
|
||||
Error::GetIdError {
|
||||
source: crate::init::Error::IdNotSet
|
||||
}
|
||||
));
|
||||
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
// do NOT call `server.maybe_load_database_configs` so DBs are not loaded and server is not ready
|
||||
|
|
|
@ -899,7 +899,7 @@ mod tests {
|
|||
let (_, config) = config();
|
||||
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
|
||||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.maybe_initialize_server().await.unwrap();
|
||||
app_server.maybe_initialize_server().await;
|
||||
app_server
|
||||
.create_database(DatabaseRules::new(
|
||||
DatabaseName::new("MyOrg_MyBucket").unwrap(),
|
||||
|
@ -947,7 +947,7 @@ mod tests {
|
|||
let (metrics_registry, config) = config();
|
||||
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
|
||||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.maybe_initialize_server().await.unwrap();
|
||||
app_server.maybe_initialize_server().await;
|
||||
app_server
|
||||
.create_database(DatabaseRules::new(
|
||||
DatabaseName::new("MetricsOrg_MetricsBucket").unwrap(),
|
||||
|
@ -1037,7 +1037,7 @@ mod tests {
|
|||
let (_, config) = config();
|
||||
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
|
||||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.maybe_initialize_server().await.unwrap();
|
||||
app_server.maybe_initialize_server().await;
|
||||
app_server
|
||||
.create_database(DatabaseRules::new(
|
||||
DatabaseName::new("MyOrg_MyBucket").unwrap(),
|
||||
|
@ -1174,7 +1174,7 @@ mod tests {
|
|||
let (_, config) = config();
|
||||
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
|
||||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.maybe_initialize_server().await.unwrap();
|
||||
app_server.maybe_initialize_server().await;
|
||||
app_server
|
||||
.create_database(DatabaseRules::new(
|
||||
DatabaseName::new("MyOrg_MyBucket").unwrap(),
|
||||
|
@ -1223,7 +1223,7 @@ mod tests {
|
|||
let (_, config) = config();
|
||||
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
|
||||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.maybe_initialize_server().await.unwrap();
|
||||
app_server.maybe_initialize_server().await;
|
||||
app_server
|
||||
.create_database(DatabaseRules::new(
|
||||
DatabaseName::new("MyOrg_MyBucket").unwrap(),
|
||||
|
|
|
@ -8,7 +8,7 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
|
|||
use server::Error;
|
||||
|
||||
match error {
|
||||
Error::IdNotSet => PreconditionViolation {
|
||||
Error::GetIdError { .. } => PreconditionViolation {
|
||||
category: "Writer ID".to_string(),
|
||||
subject: "influxdata.com/iox".to_string(),
|
||||
description: "Writer ID must be set".to_string(),
|
||||
|
|
|
@ -71,7 +71,7 @@ where
|
|||
|
||||
match self.server.set_id(id) {
|
||||
Ok(_) => Ok(Response::new(UpdateServerIdResponse {})),
|
||||
Err(e @ Error::IdAlreadySet { .. }) => {
|
||||
Err(e @ Error::SetIdError { .. }) => {
|
||||
return Err(FieldViolation {
|
||||
field: "id".to_string(),
|
||||
description: e.to_string(),
|
||||
|
|
Loading…
Reference in New Issue