fix: improve error messages with more context (#455)
parent
831a0875d6
commit
597933622d
|
@ -1,9 +1,9 @@
|
|||
use tracing::{debug, info};
|
||||
|
||||
use std::env::VarError;
|
||||
use std::fs;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::{env::VarError, path::PathBuf};
|
||||
|
||||
use crate::server::http_routes;
|
||||
use crate::server::rpc::storage;
|
||||
|
@ -13,7 +13,50 @@ use hyper::service::{make_service_fn, service_fn};
|
|||
use hyper::Server;
|
||||
use write_buffer::{Db, WriteBufferDatabases};
|
||||
|
||||
pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Unable to create database directory {:?}: {}", path, source))]
|
||||
CreatingDatabaseDirectory {
|
||||
path: PathBuf,
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to initialize database in directory {:?}: {}", db_dir, source))]
|
||||
InitializingWriteBuffer {
|
||||
db_dir: PathBuf,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to restore WAL from directory {:?}: {}", dir, source))]
|
||||
RestoringWriteBuffer {
|
||||
dir: PathBuf,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Unable to bind to listen for HTTP requests on {}: {}",
|
||||
bind_addr,
|
||||
source
|
||||
))]
|
||||
StartListening {
|
||||
bind_addr: SocketAddr,
|
||||
source: hyper::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error serving HTTP: {}", source))]
|
||||
ServingHttp { source: hyper::error::Error },
|
||||
|
||||
#[snafu(display("Error serving RPC: {}", source))]
|
||||
ServingRPC {
|
||||
source: crate::server::rpc::storage::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
pub async fn main() -> Result<()> {
|
||||
dotenv::dotenv().ok();
|
||||
|
||||
let db_dir = match std::env::var("INFLUXDB_IOX_DB_DIR") {
|
||||
|
@ -25,16 +68,23 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
path.into_os_string().into_string().unwrap()
|
||||
}
|
||||
};
|
||||
fs::create_dir_all(&db_dir)?;
|
||||
|
||||
fs::create_dir_all(&db_dir).context(CreatingDatabaseDirectory { path: &db_dir })?;
|
||||
|
||||
debug!("InfluxDB IOx Server using database directory: {:?}", db_dir);
|
||||
|
||||
let storage = Arc::new(WriteBufferDatabases::new(&db_dir));
|
||||
let dirs = storage.wal_dirs()?;
|
||||
let dirs = storage
|
||||
.wal_dirs()
|
||||
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
|
||||
.context(InitializingWriteBuffer { db_dir })?;
|
||||
|
||||
// TODO: make recovery of multiple databases multi-threaded
|
||||
for dir in dirs {
|
||||
let db = Db::restore_from_wal(dir).await?;
|
||||
let db = Db::restore_from_wal(&dir)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
|
||||
.context(RestoringWriteBuffer { dir })?;
|
||||
storage.add_db(db).await;
|
||||
}
|
||||
|
||||
|
@ -79,7 +129,9 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
}
|
||||
});
|
||||
|
||||
let server = Server::try_bind(&bind_addr)?.serve(make_svc);
|
||||
let server = Server::try_bind(&bind_addr)
|
||||
.context(StartListening { bind_addr })?
|
||||
.serve(make_svc);
|
||||
info!("Listening on http://{}", bind_addr);
|
||||
|
||||
println!("InfluxDB IOx server ready");
|
||||
|
@ -87,8 +139,8 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
// Wait for both the servers to complete
|
||||
let (grpc_server, server) = futures::future::join(grpc_server, server).await;
|
||||
|
||||
grpc_server?;
|
||||
server?;
|
||||
grpc_server.context(ServingRPC)?;
|
||||
server.context(ServingHttp)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -187,7 +187,7 @@ async fn dispatch_args(matches: ArgMatches<'_>) {
|
|||
match commands::write_buffer_server::main().await {
|
||||
Ok(()) => eprintln!("Shutdown OK"),
|
||||
Err(e) => {
|
||||
error!("Server shutdown with error: {:?}", e);
|
||||
error!("Server shutdown with error: {}", e);
|
||||
std::process::exit(ReturnCode::ServerExitedAbnormally as _);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,10 +17,13 @@ use crate::column::Column;
|
|||
use crate::partition::Partition;
|
||||
use crate::{partition::PartitionPredicate, table::Table};
|
||||
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::io::ErrorKind;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
collections::{BTreeSet, HashSet},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use arrow_deps::{
|
||||
arrow,
|
||||
|
@ -273,7 +276,7 @@ impl Db {
|
|||
|
||||
/// Create a new DB and initially restore pre-existing data in the
|
||||
/// Write Ahead Log (WAL) directory `wal_dir`
|
||||
pub async fn restore_from_wal(wal_dir: PathBuf) -> Result<Self> {
|
||||
pub async fn restore_from_wal(wal_dir: &Path) -> Result<Self> {
|
||||
let now = std::time::Instant::now();
|
||||
let name = wal_dir
|
||||
.iter()
|
||||
|
@ -283,7 +286,7 @@ impl Db {
|
|||
.with_context(|| OpenDb { dir: &wal_dir })?
|
||||
.to_string();
|
||||
|
||||
let wal_builder = WalBuilder::new(wal_dir.clone());
|
||||
let wal_builder = WalBuilder::new(wal_dir);
|
||||
let wal_details = start_wal_sync_task(wal_builder.clone())
|
||||
.await
|
||||
.context(OpeningWal { database: &name })?;
|
||||
|
@ -1322,7 +1325,7 @@ mod tests {
|
|||
|
||||
// check that it recovers from the wal
|
||||
{
|
||||
let db = Db::restore_from_wal(dir).await?;
|
||||
let db = Db::restore_from_wal(&dir).await?;
|
||||
|
||||
let partitions = db.table_to_arrow("cpu", cpu_columns).await?;
|
||||
assert_table_eq(expected_cpu_table, &partitions);
|
||||
|
|
Loading…
Reference in New Issue