Merge pull request #1606 from influxdata/crepererum/refactor_db_loading
refactor: isolate DB loading and streamline error handlingpull/24376/head
commit
6f8e771033
|
@ -75,6 +75,7 @@ use bytes::BytesMut;
|
|||
use cached::proc_macro::cached;
|
||||
use db::load_or_create_preserved_catalog;
|
||||
use futures::stream::TryStreamExt;
|
||||
use object_store::path::Path;
|
||||
use observability_deps::tracing::{debug, error, info, warn};
|
||||
use parking_lot::Mutex;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
@ -124,60 +125,87 @@ type DatabaseError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
|||
pub enum Error {
|
||||
#[snafu(display("Server error: {}", source))]
|
||||
ServerError { source: std::io::Error },
|
||||
|
||||
#[snafu(display("database not found: {}", db_name))]
|
||||
DatabaseNotFound { db_name: String },
|
||||
|
||||
#[snafu(display("invalid database: {}", source))]
|
||||
InvalidDatabaseName { source: DatabaseNameError },
|
||||
|
||||
#[snafu(display("database error: {}", source))]
|
||||
UnknownDatabaseError { source: DatabaseError },
|
||||
|
||||
#[snafu(display("getting mutable buffer chunk: {}", source))]
|
||||
MutableBufferChunk { source: DatabaseError },
|
||||
|
||||
#[snafu(display("no local buffer for database: {}", db))]
|
||||
NoLocalBuffer { db: String },
|
||||
|
||||
#[snafu(display("unable to get connection to remote server: {}", server))]
|
||||
UnableToGetConnection {
|
||||
server: String,
|
||||
source: DatabaseError,
|
||||
},
|
||||
|
||||
#[snafu(display("error replicating to remote: {}", source))]
|
||||
ErrorReplicating { source: DatabaseError },
|
||||
|
||||
#[snafu(display("id already set"))]
|
||||
IdAlreadySet { id: ServerId },
|
||||
|
||||
#[snafu(display("unable to use server until id is set"))]
|
||||
IdNotSet,
|
||||
#[snafu(display("error serializing configuration {}", source))]
|
||||
ErrorSerializing {
|
||||
|
||||
#[snafu(display("error serializing database rules to protobuf: {}", source))]
|
||||
ErrorSerializingRulesProtobuf {
|
||||
source: generated_types::database_rules::EncodeError,
|
||||
},
|
||||
|
||||
#[snafu(display("error deserializing database rules from protobuf: {}", source))]
|
||||
ErrorDeserializingRulesProtobuf {
|
||||
source: generated_types::database_rules::DecodeError,
|
||||
},
|
||||
|
||||
#[snafu(display("error deserializing configuration {}", source))]
|
||||
ErrorDeserializing { source: serde_json::Error },
|
||||
|
||||
#[snafu(display("store error: {}", source))]
|
||||
StoreError { source: object_store::Error },
|
||||
|
||||
#[snafu(display(
|
||||
"no database configuration present in directory that contains data: {:?}",
|
||||
location
|
||||
))]
|
||||
NoDatabaseConfigError { location: object_store::path::Path },
|
||||
|
||||
#[snafu(display("database already exists"))]
|
||||
DatabaseAlreadyExists { db_name: String },
|
||||
|
||||
#[snafu(display("error converting line protocol to flatbuffers: {}", source))]
|
||||
LineConversion { source: entry::Error },
|
||||
|
||||
#[snafu(display("error decoding entry flatbuffers: {}", source))]
|
||||
DecodingEntry {
|
||||
source: flatbuffers::InvalidFlatbuffer,
|
||||
},
|
||||
|
||||
#[snafu(display("shard not found: {}", shard_id))]
|
||||
ShardNotFound { shard_id: ShardId },
|
||||
|
||||
#[snafu(display("hard buffer limit reached"))]
|
||||
HardLimitReached {},
|
||||
|
||||
#[snafu(display("no remote configured for node group: {:?}", node_group))]
|
||||
NoRemoteConfigured { node_group: NodeGroup },
|
||||
|
||||
#[snafu(display("all remotes failed connecting: {:?}", errors))]
|
||||
NoRemoteReachable {
|
||||
errors: HashMap<GRpcConnectionString, ConnectionManagerError>,
|
||||
},
|
||||
|
||||
#[snafu(display("remote error: {}", source))]
|
||||
RemoteError { source: ConnectionManagerError },
|
||||
|
||||
#[snafu(display("cannot load catalog: {}", source))]
|
||||
CatalogLoadError { source: DatabaseError },
|
||||
}
|
||||
|
@ -463,7 +491,7 @@ impl<M: ConnectionManager> Server<M> {
|
|||
let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
|
||||
|
||||
let mut data = BytesMut::new();
|
||||
encode_database_rules(rules, &mut data).context(ErrorSerializing)?;
|
||||
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
|
||||
|
||||
let len = data.len();
|
||||
|
||||
|
@ -514,49 +542,11 @@ impl<M: ConnectionManager> Server<M> {
|
|||
path.set_file_name(DB_RULES_FILE_NAME);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let mut res = get_database_config_bytes(&path, &store).await;
|
||||
while let Err(e) = &res {
|
||||
if let Error::NoDatabaseConfigError { location } = e {
|
||||
warn!(?location, "{}", e);
|
||||
return;
|
||||
}
|
||||
error!(
|
||||
"error getting database config {:?} from object store: {}",
|
||||
path, e
|
||||
);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(
|
||||
STORE_ERROR_PAUSE_SECONDS,
|
||||
))
|
||||
.await;
|
||||
res = get_database_config_bytes(&path, &store).await;
|
||||
if let Err(e) =
|
||||
Self::load_database_config(server_id, store, config, exec, path).await
|
||||
{
|
||||
error!(%e, "cannot load database");
|
||||
}
|
||||
|
||||
let res = res.unwrap(); // it's not an error, otherwise we'd be still looping above
|
||||
let res = res.freeze();
|
||||
|
||||
match decode_database_rules(res) {
|
||||
Err(e) => {
|
||||
error!("error parsing database config {:?} from store: {}", path, e)
|
||||
}
|
||||
Ok(rules) => {
|
||||
match load_or_create_preserved_catalog(
|
||||
rules.db_name(),
|
||||
Arc::clone(&store),
|
||||
server_id,
|
||||
config.metrics_registry(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => error!("cannot load database: {}", e),
|
||||
Ok(preserved_catalog) => match config.create_db(rules) {
|
||||
Err(e) => error!("error adding database to config: {}", e),
|
||||
Ok(handle) => {
|
||||
handle.commit(server_id, store, exec, preserved_catalog)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
@ -566,6 +556,49 @@ impl<M: ConnectionManager> Server<M> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_database_config(
|
||||
server_id: ServerId,
|
||||
store: Arc<ObjectStore>,
|
||||
config: Arc<Config>,
|
||||
exec: Arc<Executor>,
|
||||
path: Path,
|
||||
) -> Result<()> {
|
||||
let serialized_rules = loop {
|
||||
match get_database_config_bytes(&path, &store).await {
|
||||
Ok(data) => break data,
|
||||
Err(e) => {
|
||||
if let Error::NoDatabaseConfigError { location } = &e {
|
||||
warn!(?location, "{}", e);
|
||||
return Ok(());
|
||||
}
|
||||
error!(
|
||||
"error getting database config {:?} from object store: {}",
|
||||
path, e
|
||||
);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(STORE_ERROR_PAUSE_SECONDS))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
};
|
||||
let rules = decode_database_rules(serialized_rules.freeze())
|
||||
.context(ErrorDeserializingRulesProtobuf)?;
|
||||
|
||||
let preserved_catalog = load_or_create_preserved_catalog(
|
||||
rules.db_name(),
|
||||
Arc::clone(&store),
|
||||
server_id,
|
||||
config.metrics_registry(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CatalogLoadError)?;
|
||||
|
||||
let handle = config.create_db(rules)?;
|
||||
handle.commit(server_id, store, exec, preserved_catalog);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `write_lines` takes in raw line protocol and converts it to a collection
|
||||
/// of ShardedEntry which are then sent to other IOx servers based on
|
||||
/// the ShardConfig or sent to the local database for buffering in the
|
||||
|
|
Loading…
Reference in New Issue