Merge pull request #2853 from influxdata/cn/only-read-server

feat: Only read server config to know which databases to init, don't fall back to object storage
pull/24376/head
kodiakhq[bot] 2021-10-25 15:22:19 +00:00 committed by GitHub
commit c85d35ea15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 256 additions and 174 deletions

View File

@ -114,8 +114,6 @@ impl Generation {
impl IoxObjectStore {
/// Get the data for the server config to determine the names and locations of the databases
/// that this server owns.
// TODO: this is in the process of replacing list_possible_databases for the floating databases
// design
pub async fn get_server_config_file(inner: &ObjectStore, server_id: ServerId) -> Result<Bytes> {
let path = paths::server_config_path(inner, server_id);
let mut stream = inner.get(&path).await?;
@ -155,35 +153,6 @@ impl IoxObjectStore {
RootPath::new(inner, server_id, database_name)
}
/// List database names and their locations in object storage that need to be further checked
/// for generations and whether they're marked as deleted or not.
// TODO: this is in the process of being deprecated in favor of get_server_config_file
pub async fn list_possible_databases(
inner: &ObjectStore,
server_id: ServerId,
) -> Result<Vec<(DatabaseName<'static>, String)>> {
let path = paths::all_databases_path(inner, server_id);
let list_result = inner.list_with_delimiter(&path).await?;
Ok(list_result
.common_prefixes
.into_iter()
.filter_map(|prefix| {
let prefix_parsed: DirsAndFileName = prefix.clone().into();
let last = prefix_parsed
.directories
.last()
.expect("path can't be empty");
let db_name = DatabaseName::new(last.encoded().to_string())
.log_if_error("invalid database directory")
.ok()?;
Some((db_name, prefix.to_raw()))
})
.collect())
}
/// List this server's databases in object storage along with their generation IDs.
pub async fn list_detailed_databases(
inner: &ObjectStore,
@ -208,8 +177,7 @@ impl IoxObjectStore {
/// List database names in object storage along with all existing generations for each database
/// and whether the generations are marked as deleted or not. Useful for finding candidates
/// to restore or to permanently delete. Makes many more calls to object storage than
/// [`IoxObjectStore::list_possible_databases`].
/// to restore or to permanently delete. Makes many calls to object storage.
async fn list_all_databases(
inner: &ObjectStore,
server_id: ServerId,
@ -1117,50 +1085,6 @@ mod tests {
iox_object_store.write_tombstone().await.unwrap();
}
#[tokio::test]
async fn list_possible_databases_returns_all_potential_databases() {
let object_store = make_object_store();
let server_id = make_server_id();
// Create a normal database, will be in the list
let db_normal = DatabaseName::new("db_normal").unwrap();
create_database(Arc::clone(&object_store), server_id, &db_normal).await;
// Create a database, then delete it - will still be in the list
let db_deleted = DatabaseName::new("db_deleted").unwrap();
let db_deleted_iox_store =
create_database(Arc::clone(&object_store), server_id, &db_deleted).await;
delete_database(&db_deleted_iox_store).await;
// Put a file in a directory that looks like a database directory but has no rules,
// will still be in the list
let not_a_db = DatabaseName::new("not_a_db").unwrap();
let mut not_rules_path = object_store.new_path();
not_rules_path.push_all_dirs(&[&server_id.to_string(), not_a_db.as_str(), "0"]);
not_rules_path.set_file_name("not_rules.txt");
object_store
.put(&not_rules_path, Bytes::new())
.await
.unwrap();
// Put a file in a directory that's an invalid database name - this WON'T be in the list
let invalid_db_name = ("a".repeat(65)).to_string();
let mut invalid_db_name_rules_path = object_store.new_path();
invalid_db_name_rules_path.push_all_dirs(&[&server_id.to_string(), &invalid_db_name, "0"]);
invalid_db_name_rules_path.set_file_name("rules.pb");
object_store
.put(&invalid_db_name_rules_path, Bytes::new())
.await
.unwrap();
let possible = IoxObjectStore::list_possible_databases(&object_store, server_id)
.await
.unwrap();
let mut names: Vec<_> = possible.into_iter().map(|d| d.0).collect();
names.sort();
assert_eq!(names, vec![db_deleted, db_normal, not_a_db]);
}
#[tokio::test]
async fn list_all_databases_returns_generation_info() {
let object_store = make_object_store();

View File

@ -131,6 +131,11 @@ pub enum Error {
#[snafu(display("Missing aws-secret-access-key"))]
MissingSecretAccessKey,
NotFound {
location: String,
source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>,
},
}
/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
@ -208,9 +213,18 @@ impl ObjectStoreApi for AmazonS3 {
.client
.get_object(get_request)
.await
.context(UnableToGetData {
.map_err(|e| match e {
rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_)) => {
Error::NotFound {
location: key.clone(),
source: e,
}
}
_ => Error::UnableToGetData {
bucket: self.bucket_name.to_owned(),
location: key.clone(),
source: e,
},
})?
.body
.context(NoData {
@ -729,20 +743,20 @@ mod tests {
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(ObjectStoreError::AwsObjectStoreError {
source:
Error::UnableToGetData {
source,
bucket,
location,
},
}) = err.downcast_ref::<ObjectStoreError>()
if let Some(ObjectStoreError::NotFound { location, source }) =
err.downcast_ref::<ObjectStoreError>()
{
assert!(matches!(
source,
rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_))
));
assert_eq!(bucket, &config.bucket);
let source_variant = source.downcast_ref::<rusoto_core::RusotoError<_>>();
assert!(
matches!(
source_variant,
Some(rusoto_core::RusotoError::Service(
rusoto_s3::GetObjectError::NoSuchKey(_)
)),
),
"got: {:?}",
source_variant
);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);

View File

@ -30,7 +30,9 @@ pub enum Error {
},
#[snafu(display("Unable to walk dir: {}", source))]
UnableToWalkDir { source: walkdir::Error },
UnableToWalkDir {
source: walkdir::Error,
},
#[snafu(display("Unable to access metadata for {}: {}", path.display(), source))]
UnableToAccessMetadata {
@ -39,22 +41,44 @@ pub enum Error {
},
#[snafu(display("Unable to copy data to file: {}", source))]
UnableToCopyDataToFile { source: io::Error },
UnableToCopyDataToFile {
source: io::Error,
},
#[snafu(display("Unable to create dir {}: {}", path.display(), source))]
UnableToCreateDir { source: io::Error, path: PathBuf },
UnableToCreateDir {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to create file {}: {}", path.display(), err))]
UnableToCreateFile { path: PathBuf, err: io::Error },
UnableToCreateFile {
path: PathBuf,
err: io::Error,
},
#[snafu(display("Unable to delete file {}: {}", path.display(), source))]
UnableToDeleteFile { source: io::Error, path: PathBuf },
UnableToDeleteFile {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to open file {}: {}", path.display(), source))]
UnableToOpenFile { source: io::Error, path: PathBuf },
UnableToOpenFile {
source: io::Error,
path: PathBuf,
},
#[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
UnableToReadBytes { source: io::Error, path: PathBuf },
UnableToReadBytes {
source: io::Error,
path: PathBuf,
},
NotFound {
location: String,
source: io::Error,
},
}
/// Local filesystem storage suitable for testing or for opting out of using a
@ -110,9 +134,19 @@ impl ObjectStoreApi for File {
async fn get(&self, location: &Self::Path) -> Result<BoxStream<'static, Result<Bytes>>> {
let path = self.path(location);
let file = fs::File::open(&path)
.await
.context(UnableToOpenFile { path: &path })?;
let file = fs::File::open(&path).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
Error::NotFound {
location: location.to_string(),
source: e,
}
} else {
Error::UnableToOpenFile {
path: path.clone(),
source: e,
}
}
})?;
let s = FramedRead::new(file, BytesCodec::new())
.map_ok(|b| b.freeze())
@ -297,14 +331,12 @@ impl File {
#[cfg(test)]
mod tests {
use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
use super::*;
use crate::{
tests::{list_with_delimiter, put_get_delete_list},
ObjectStore, ObjectStoreApi, ObjectStorePath,
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
use tempfile::TempDir;
#[tokio::test]
@ -395,4 +427,32 @@ mod tests {
// `list_with_delimiter
assert!(store.list_with_delimiter(&store.new_path()).await.is_err());
}
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn get_nonexistent_location() {
let root = TempDir::new().unwrap();
let integration = ObjectStore::new_file(root.path());
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(ObjectStoreError::NotFound { location, source }) =
err.downcast_ref::<ObjectStoreError>()
{
let source_variant = source.downcast_ref::<std::io::Error>();
assert!(
matches!(source_variant, Some(std::io::Error { .. }),),
"got: {:?}",
source_variant
);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);
}
}
}

View File

@ -68,6 +68,11 @@ pub enum Error {
bucket: String,
location: String,
},
NotFound {
location: String,
source: cloud_storage::Error,
},
}
/// Configuration for connecting to [Google Cloud Storage](https://cloud.google.com/storage/).
@ -122,9 +127,18 @@ impl ObjectStoreApi for GoogleCloudStorage {
.object()
.download(&bucket_name, &location_copy)
.await
.context(UnableToGetData {
bucket: &self.bucket_name,
.map_err(|e| match e {
cloud_storage::Error::Other(ref text) if text.starts_with("No such object") => {
Error::NotFound {
location,
source: e,
}
}
_ => Error::UnableToGetData {
bucket: bucket_name.clone(),
location,
source: e,
},
})?;
Ok(futures::stream::once(async move { Ok(bytes.into()) }).boxed())
@ -337,21 +351,15 @@ mod test {
.await
.unwrap_err();
if let Some(ObjectStoreError::GcsObjectStoreError {
source:
Error::UnableToGetData {
source,
bucket,
location,
},
}) = err.downcast_ref::<ObjectStoreError>()
if let Some(ObjectStoreError::NotFound { location, source }) =
err.downcast_ref::<ObjectStoreError>()
{
let source_variant = source.downcast_ref::<cloud_storage::Error>();
assert!(
matches!(source, cloud_storage::Error::Other(_)),
matches!(source_variant, Some(cloud_storage::Error::Other(_))),
"got: {:?}",
source
source_variant
);
assert_eq!(bucket, &config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err)

View File

@ -292,12 +292,9 @@ impl ObjectStoreApi for ObjectStore {
(InMemoryThrottled(in_mem_throttled), path::Path::InMemory(location)) => {
in_mem_throttled.get(location).await?.err_into().boxed()
}
(File(file), path::Path::File(location)) => file
.get(location)
.await
.context(FileObjectStoreError)?
.err_into()
.boxed(),
(File(file), path::Path::File(location)) => {
file.get(location).await?.err_into().boxed()
}
(MicrosoftAzure(azure), path::Path::MicrosoftAzure(location)) => {
azure.get(location).await?.err_into().boxed()
}
@ -609,25 +606,49 @@ pub enum Error {
#[snafu(display("{}", source))]
DummyObjectStoreError { source: dummy::Error },
#[snafu(display("Object at location {} not found: {}", location, source))]
NotFound {
location: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
}
impl From<disk::Error> for Error {
fn from(source: disk::Error) -> Self {
Self::FileObjectStoreError { source }
match source {
disk::Error::NotFound { location, source } => Self::NotFound {
location,
source: source.into(),
},
_ => Self::FileObjectStoreError { source },
}
}
}
#[cfg(feature = "gcp")]
impl From<gcp::Error> for Error {
fn from(source: gcp::Error) -> Self {
Self::GcsObjectStoreError { source }
match source {
gcp::Error::NotFound { location, source } => Self::NotFound {
location,
source: source.into(),
},
_ => Self::GcsObjectStoreError { source },
}
}
}
#[cfg(feature = "aws")]
impl From<aws::Error> for Error {
fn from(source: aws::Error) -> Self {
Self::AwsObjectStoreError { source }
match source {
aws::Error::NotFound { location, source } => Self::NotFound {
location,
source: source.into(),
},
_ => Self::AwsObjectStoreError { source },
}
}
}
@ -640,7 +661,13 @@ impl From<azure::Error> for Error {
impl From<memory::Error> for Error {
fn from(source: memory::Error) -> Self {
Self::InMemoryObjectStoreError { source }
match source {
memory::Error::NoDataInMemory { ref location } => Self::NotFound {
location: location.into(),
source: source.into(),
},
// currently "not found" is the only error that can happen with the in-memory store
}
}
}

View File

@ -159,8 +159,8 @@ mod tests {
use super::*;
use crate::{
tests::{list_with_delimiter, put_get_delete_list},
ObjectStore, ObjectStoreApi, ObjectStorePath,
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use futures::TryStreamExt;
@ -194,4 +194,31 @@ mod tests {
.unwrap();
assert_eq!(&*read_data, expected_data);
}
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn nonexistent_location() {
let integration = ObjectStore::new_in_memory();
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(ObjectStoreError::NotFound { location, source }) =
err.downcast_ref::<ObjectStoreError>()
{
let source_variant = source.downcast_ref::<Error>();
assert!(
matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
"got: {:?}",
source_variant
);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);
}
}
}

View File

@ -1192,7 +1192,13 @@ async fn maybe_initialize_server(shared: &ServerShared) {
init_ready.server_id,
)
.await
.map_err(|e| InitError::GetServerConfig { source: e })
.or_else(|e| match e {
// If this is the first time starting up this server and there is no config file yet,
// this isn't a problem. Start an empty server config.
object_store::Error::NotFound { .. } => Ok(bytes::Bytes::new()),
// Any other error is a problem.
_ => Err(InitError::GetServerConfig { source: e }),
})
.and_then(|config| {
generated_types::server_config::decode_persisted_server_config(config)
.map_err(|e| InitError::DeserializeServerConfig { source: e })
@ -1207,21 +1213,9 @@ async fn maybe_initialize_server(shared: &ServerShared) {
location,
)
})
.collect()
.collect::<Vec<_>>()
});
// TODO: This is a temporary fallback until the transition to server config files being the
// source of truth for database name and location is finished.
let maybe_databases = match maybe_databases {
Ok(maybe) => Ok(maybe),
Err(_) => IoxObjectStore::list_possible_databases(
shared.application.object_store(),
init_ready.server_id,
)
.await
.map_err(|e| InitError::ListDatabases { source: e }),
};
let next_state = match maybe_databases {
Ok(databases) => {
let mut state = ServerStateInitialized {
@ -1634,7 +1628,7 @@ mod tests {
}
#[tokio::test]
async fn load_databases_and_transition_to_server_config() {
async fn load_databases() {
let application = make_application();
let server = make_server(Arc::clone(&application));
@ -1666,13 +1660,6 @@ mod tests {
.await
.expect("cannot delete rules file");
// delete server config file - this is not something that's supposed to happen but is
// what will happen during the transition to using the server config file
let mut path = application.object_store().new_path();
path.push_dir(server_id.to_string());
path.set_file_name("config.pb");
application.object_store().delete(&path).await.unwrap();
let server = make_server(Arc::clone(&application));
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.wait_for_init().await.unwrap();
@ -2100,10 +2087,14 @@ mod tests {
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
let err = server.wait_for_init().await.unwrap_err();
assert!(matches!(err.as_ref(), InitError::ListDatabases { .. }));
assert!(
matches!(err.as_ref(), InitError::GetServerConfig { .. }),
"got: {:?}",
err
);
assert_contains!(
server.server_init_error().unwrap().to_string(),
"error listing databases in object storage:"
"error getting server config from object storage:"
);
}
@ -2221,24 +2212,27 @@ mod tests {
let foo_db_name = DatabaseName::new("foo").unwrap();
// create a directory in object storage that looks like it could
// be a database directory, but doesn't have any valid generation
// directories in it
let mut fake_db_path = application.object_store().new_path();
fake_db_path.push_all_dirs(&[server_id.to_string().as_str(), foo_db_name.as_str()]);
let mut not_generation_file = fake_db_path.clone();
not_generation_file.set_file_name("not-a-generation");
application
.object_store()
.put(&not_generation_file, Bytes::new())
.await
.unwrap();
// start server
let server = make_server(Arc::clone(&application));
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
// create database
create_simple_database(&server, &foo_db_name)
.await
.expect("failed to create database");
// delete database
server
.delete_database(&foo_db_name)
.await
.expect("failed to delete database");
// restart server
let server = make_server(Arc::clone(&application));
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
// generic error MUST NOT be set
assert!(server.server_init_error().is_none());

View File

@ -1121,8 +1121,8 @@ async fn test_get_server_status_global_error() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = server_fixture.management_client();
// we need to "break" the object store AFTER the server was started, otherwise the server process will exit
// immediately
// we need to "break" the object store AFTER the server was started, otherwise the server
// process will exit immediately
let metadata = server_fixture.dir().metadata().unwrap();
let mut permissions = metadata.permissions();
permissions.set_mode(0o000);
@ -1137,7 +1137,8 @@ async fn test_get_server_status_global_error() {
loop {
let status = client.get_server_status().await.unwrap();
if let Some(err) = status.error {
assert!(dbg!(err.message).starts_with("error listing databases in object storage:"));
assert!(dbg!(err.message)
.starts_with("error getting server config from object storage:"));
assert!(status.database_statuses.is_empty());
return;
}
@ -1208,6 +1209,33 @@ async fn test_get_server_status_db_error() {
other_gen_path.push("rules.pb");
std::fs::write(other_gen_path, "foo").unwrap();
// create the server config listing the ownership of these three databases
let mut path = server_fixture.dir().to_path_buf();
path.push("42");
path.push("config.pb");
let data = ServerConfig {
databases: vec![
(String::from("my_db"), String::from("42/my_db")),
(
String::from("soft_deleted"),
String::from("42/soft_deleted"),
),
(
String::from("multiple_active"),
String::from("42/multiple_active"),
),
]
.into_iter()
.collect(),
};
let mut encoded = bytes::BytesMut::new();
generated_types::server_config::encode_persisted_server_config(&data, &mut encoded)
.expect("server config serialization should be valid");
let encoded = encoded.freeze();
std::fs::write(path, encoded).unwrap();
// initialize
client.update_server_id(42).await.expect("set ID failed");
server_fixture.wait_server_initialized().await;