Merge pull request #2887 from influxdata/cn/write-db-owner-file

feat: Write owner info in the database's object store directory
pull/24376/head
kodiakhq[bot] 2021-10-20 13:58:21 +00:00 committed by GitHub
commit 6e81bde4f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 305 additions and 9 deletions

View File

@ -11,3 +11,13 @@ option go_package = "github.com/influxdata/iox/management/v1";
message ServerConfig {
map<string, string> databases = 1;
}
// Stores information about a server that owns a database. To be stored in a database's object
// store directory as verification of ownership.
message OwnerInfo {
// The ID of the server that owns this database
uint32 id = 1;
// The path to this server's config file in object storage
string location = 2;
}

View File

@ -403,6 +403,9 @@ message DatabaseStatus {
// No active database
DATABASE_STATE_NO_ACTIVE_DATABASE = 10;
// Database owner info has been loaded
DATABASE_STATE_OWNER_INFO_LOADED = 11;
// Rules are loaded
DATABASE_STATE_RULES_LOADED = 2;
@ -415,6 +418,9 @@ message DatabaseStatus {
// Error loading rules
DATABASE_STATE_RULES_LOAD_ERROR = 5;
// Error loading owner info
DATABASE_STATE_OWNER_INFO_LOAD_ERROR = 12;
// Error during catalog load
DATABASE_STATE_CATALOG_LOAD_ERROR = 6;

View File

@ -15,6 +15,8 @@ impl DatabaseState {
DatabaseState::DatabaseObjectStoreFound => "DatabaseObjectStoreFound",
DatabaseState::DatabaseObjectStoreLookupError => "DatabaseObjectStoreLookupError",
DatabaseState::NoActiveDatabase => "NoActiveDatabase",
DatabaseState::OwnerInfoLoaded => "OwnerInfoLoaded",
DatabaseState::OwnerInfoLoadError => "OwnerInfoLoadError",
DatabaseState::Unspecified => "Unspecified",
}
}

View File

@ -14,3 +14,20 @@ pub fn encode_persisted_server_config(
) -> Result<(), EncodeError> {
prost::Message::encode(server_config, bytes)
}
/// Encode server information to be serialized into a database's object store directory and used to
/// identify that database's owning server
pub fn encode_database_owner_info(
owner_info: &management::OwnerInfo,
bytes: &mut prost::bytes::BytesMut,
) -> Result<(), EncodeError> {
prost::Message::encode(owner_info, bytes)
}
/// Encode server information that was encoded using `encode_database_owner_info` to compare
/// with the currently-running server
pub fn decode_database_owner_info(
bytes: prost::bytes::Bytes,
) -> Result<management::OwnerInfo, DecodeError> {
prost::Message::decode(bytes)
}

View File

@ -139,6 +139,12 @@ impl IoxObjectStore {
inner.put(&path, bytes).await
}
/// Return the path to the server config file to be used in database ownership information to
/// identify the current server that a database thinks is its owner.
pub fn server_config_path(inner: &ObjectStore, server_id: ServerId) -> Path {
paths::server_config_path(inner, server_id)
}
/// Returns what the root path would be for a given database. Does not check existence or
/// validity of the path in object storage.
pub fn root_path_for(
@ -421,6 +427,29 @@ impl IoxObjectStore {
}
}
/// In the database's root directory, write out a file pointing to the server's config. This
/// data can serve as an extra check on which server owns this database.
pub async fn put_owner_file(&self, bytes: Bytes) -> Result<()> {
let owner_path = self.generation_path.owner_path();
self.inner.put(&owner_path, bytes).await
}
/// Return the contents of the owner file in the database's root directory that provides
/// information on the server that owns this database.
pub async fn get_owner_file(&self) -> Result<Bytes> {
let owner_path = self.generation_path.owner_path();
let mut stream = self.inner.get(&owner_path).await?;
let mut bytes = BytesMut::new();
while let Some(buf) = stream.next().await {
bytes.extend(buf?);
}
Ok(bytes.freeze())
}
/// The location in object storage for all files for this database's generation, suitable for
/// logging or debugging purposes only. Do not parse this, as its format is subject to change!
pub fn debug_database_path(&self) -> String {
@ -945,6 +974,56 @@ mod tests {
assert_eq!(file_count, 0);
}
fn make_owner_path(object_store: &ObjectStore, server_id: ServerId, db_name: &str) -> Path {
let mut p = object_store.new_path();
p.push_all_dirs(&[&server_id.to_string(), db_name, "0"]);
p.set_file_name("owner.pb");
p
}
#[tokio::test]
async fn owner_should_be_a_file() {
let object_store = make_object_store();
let server_id = make_server_id();
let database_name = DatabaseName::new("clouds").unwrap();
let owner_path = make_owner_path(&object_store, server_id, "clouds");
let iox_object_store =
IoxObjectStore::new(Arc::clone(&object_store), server_id, &database_name)
.await
.unwrap();
// PUT
let original_file_content = Bytes::from("hello world");
iox_object_store
.put_owner_file(original_file_content.clone())
.await
.unwrap();
let actual_content = object_store
.get(&owner_path)
.await
.unwrap()
.next()
.await
.unwrap()
.unwrap();
assert_eq!(original_file_content, actual_content);
// GET
let updated_file_content = Bytes::from("goodbye moon");
let expected_content = updated_file_content.clone();
object_store
.put(&owner_path, updated_file_content)
.await
.unwrap();
let actual_content = iox_object_store.get_owner_file().await.unwrap();
assert_eq!(expected_content, actual_content);
}
#[tokio::test]
async fn write_tombstone_twice_is_fine() {
let object_store = make_object_store();

View File

@ -15,6 +15,7 @@ pub mod transaction_file;
use transaction_file::TransactionFilePath;
const SERVER_CONFIG_FILE_NAME: &str = "config.pb";
const DATABASE_OWNER_FILE_NAME: &str = "owner.pb";
/// The path to the server file containing the list of databases this server owns.
// TODO: this is in the process of replacing all_databases_path for the floating databases design
@ -107,6 +108,12 @@ impl GenerationPath {
pub(crate) fn transactions_path(&self) -> TransactionsPath {
TransactionsPath::new(self)
}
pub(crate) fn owner_path(&self) -> Path {
let mut result = self.inner.clone();
result.set_file_name(DATABASE_OWNER_FILE_NAME);
result
}
}
#[derive(Debug, Clone)]

View File

@ -16,7 +16,9 @@ use futures::{
future::{BoxFuture, FusedFuture, Shared},
FutureExt, TryFutureExt,
};
use generated_types::database_state::DatabaseState as DatabaseStateCode;
use generated_types::{
database_state::DatabaseState as DatabaseStateCode, influxdata::iox::management,
};
use internal_types::freezable::Freezable;
use iox_object_store::IoxObjectStore;
use observability_deps::tracing::{error, info, warn};
@ -212,6 +214,21 @@ impl Database {
let location = iox_object_store.root_path();
let owner_info = management::v1::OwnerInfo {
id: server_id.get_u32(),
location: IoxObjectStore::server_config_path(application.object_store(), server_id)
.to_string(),
};
let mut encoded = bytes::BytesMut::new();
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
.expect("owner info serialization should be valid");
let encoded = encoded.freeze();
iox_object_store
.put_owner_file(encoded)
.await
.context(SavingOwner)?;
provided_rules
.persist(uuid, &iox_object_store)
.await
@ -365,6 +382,11 @@ impl Database {
self.shared.state.read().provided_rules()
}
/// Returns the info about the owning server if it has been loaded
pub fn owner_info(&self) -> Option<management::v1::OwnerInfo> {
self.shared.state.read().owner_info()
}
/// Always-constructable location in object store; may not actually exist yet
pub fn location(&self) -> String {
IoxObjectStore::root_path_for(
@ -480,11 +502,13 @@ impl Database {
match &**self.shared.state.read() {
DatabaseState::Known(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_) => {} // Non-terminal state
DatabaseState::Initialized(_) => return Ok(()),
DatabaseState::DatabaseObjectStoreLookupError(_, e)
| DatabaseState::NoActiveDatabase(_, e)
| DatabaseState::OwnerInfoLoadError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::ReplayError(_, e) => return Err(Arc::clone(e)),
@ -819,6 +843,7 @@ async fn initialize_database(shared: &DatabaseShared) {
// Can perform work
DatabaseState::Known(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_) => {
match state.try_freeze() {
@ -837,6 +862,7 @@ async fn initialize_database(shared: &DatabaseShared) {
}
// Operator intervention required
DatabaseState::DatabaseObjectStoreLookupError(_, e)
| DatabaseState::OwnerInfoLoadError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::ReplayError(_, e) => {
@ -873,6 +899,10 @@ async fn initialize_database(shared: &DatabaseShared) {
Err(e) => DatabaseState::DatabaseObjectStoreLookupError(state, Arc::new(e)),
},
DatabaseState::DatabaseObjectStoreFound(state) => match state.advance(shared).await {
Ok(state) => DatabaseState::OwnerInfoLoaded(state),
Err(e) => DatabaseState::OwnerInfoLoadError(state, Arc::new(e)),
},
DatabaseState::OwnerInfoLoaded(state) => match state.advance(shared).await {
Ok(state) => DatabaseState::RulesLoaded(state),
Err(e) => DatabaseState::RulesLoadError(state, Arc::new(e)),
},
@ -930,6 +960,24 @@ pub enum InitError {
#[snafu(display("error during replay: {}", source))]
Replay { source: crate::db::Error },
#[snafu(display("error saving database owner: {}", source))]
SavingOwner { source: object_store::Error },
#[snafu(display("error loading owner info: {}", source))]
LoadingOwnerInfo { source: object_store::Error },
#[snafu(display("error decoding owner info: {}", source))]
DecodingOwnerInfo {
source: generated_types::DecodeError,
},
#[snafu(display(
"Server ID in the database's owner info file ({}) does not match this server's ID ({})",
actual,
expected
))]
DatabaseOwnerMismatch { actual: u32, expected: u32 },
#[snafu(display("error saving database rules: {}", source))]
SavingRules { source: crate::rules::Error },
@ -957,14 +1005,15 @@ pub enum InitError {
enum DatabaseState {
Known(DatabaseStateKnown),
DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound),
OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded),
RulesLoaded(DatabaseStateRulesLoaded),
CatalogLoaded(DatabaseStateCatalogLoaded),
Initialized(DatabaseStateInitialized),
DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc<InitError>),
NoActiveDatabase(DatabaseStateKnown, Arc<InitError>),
RulesLoadError(DatabaseStateDatabaseObjectStoreFound, Arc<InitError>),
OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc<InitError>),
RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc<InitError>),
CatalogLoadError(DatabaseStateRulesLoaded, Arc<InitError>),
ReplayError(DatabaseStateCatalogLoaded, Arc<InitError>),
}
@ -982,6 +1031,7 @@ impl DatabaseState {
DatabaseState::DatabaseObjectStoreFound(_) => {
DatabaseStateCode::DatabaseObjectStoreFound
}
DatabaseState::OwnerInfoLoaded(_) => DatabaseStateCode::OwnerInfoLoaded,
DatabaseState::RulesLoaded(_) => DatabaseStateCode::RulesLoaded,
DatabaseState::CatalogLoaded(_) => DatabaseStateCode::CatalogLoaded,
DatabaseState::Initialized(_) => DatabaseStateCode::Initialized,
@ -989,6 +1039,7 @@ impl DatabaseState {
DatabaseStateCode::DatabaseObjectStoreLookupError
}
DatabaseState::NoActiveDatabase(_, _) => DatabaseStateCode::NoActiveDatabase,
DatabaseState::OwnerInfoLoadError(_, _) => DatabaseStateCode::OwnerInfoLoadError,
DatabaseState::RulesLoadError(_, _) => DatabaseStateCode::RulesLoadError,
DatabaseState::CatalogLoadError(_, _) => DatabaseStateCode::CatalogLoadError,
DatabaseState::ReplayError(_, _) => DatabaseStateCode::ReplayError,
@ -999,11 +1050,13 @@ impl DatabaseState {
match self {
DatabaseState::Known(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_)
| DatabaseState::Initialized(_) => None,
DatabaseState::DatabaseObjectStoreLookupError(_, e)
| DatabaseState::NoActiveDatabase(_, e)
| DatabaseState::OwnerInfoLoadError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::ReplayError(_, e) => Some(e),
@ -1016,6 +1069,8 @@ impl DatabaseState {
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::DatabaseObjectStoreLookupError(_, _)
| DatabaseState::NoActiveDatabase(_, _)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::OwnerInfoLoadError(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
Some(Arc::clone(&state.provided_rules))
@ -1027,13 +1082,35 @@ impl DatabaseState {
}
}
fn owner_info(&self) -> Option<management::v1::OwnerInfo> {
match self {
DatabaseState::Known(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::DatabaseObjectStoreLookupError(_, _)
| DatabaseState::NoActiveDatabase(_, _)
| DatabaseState::OwnerInfoLoadError(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::OwnerInfoLoaded(state) => Some(state.owner_info.clone()),
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
Some(state.owner_info.clone())
}
DatabaseState::CatalogLoaded(state) | DatabaseState::ReplayError(state, _) => {
Some(state.owner_info.clone())
}
DatabaseState::Initialized(state) => Some(state.owner_info.clone()),
}
}
fn iox_object_store(&self) -> Option<Arc<IoxObjectStore>> {
match self {
DatabaseState::Known(_)
| DatabaseState::DatabaseObjectStoreLookupError(_, _)
| DatabaseState::NoActiveDatabase(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::DatabaseObjectStoreFound(state) => {
| DatabaseState::NoActiveDatabase(_, _) => None,
DatabaseState::DatabaseObjectStoreFound(state)
| DatabaseState::OwnerInfoLoadError(state, _) => {
Some(Arc::clone(&state.iox_object_store))
}
DatabaseState::OwnerInfoLoaded(state) | DatabaseState::RulesLoadError(state, _) => {
Some(Arc::clone(&state.iox_object_store))
}
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
@ -1091,6 +1168,42 @@ struct DatabaseStateDatabaseObjectStoreFound {
}
impl DatabaseStateDatabaseObjectStoreFound {
/// Load owner info from object storage and verify it matches the current owner
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateOwnerInfoLoaded, InitError> {
let raw_owner_info = self
.iox_object_store
.get_owner_file()
.await
.context(LoadingOwnerInfo)?;
let owner_info = generated_types::server_config::decode_database_owner_info(raw_owner_info)
.context(DecodingOwnerInfo)?;
if owner_info.id != shared.config.server_id.get_u32() {
return DatabaseOwnerMismatch {
actual: owner_info.id,
expected: shared.config.server_id.get_u32(),
}
.fail();
}
Ok(DatabaseStateOwnerInfoLoaded {
owner_info,
iox_object_store: Arc::clone(&self.iox_object_store),
})
}
}
#[derive(Debug, Clone)]
struct DatabaseStateOwnerInfoLoaded {
owner_info: management::v1::OwnerInfo,
iox_object_store: Arc<IoxObjectStore>,
}
impl DatabaseStateOwnerInfoLoaded {
/// Load database rules from object storage
async fn advance(
&self,
@ -1110,6 +1223,7 @@ impl DatabaseStateDatabaseObjectStoreFound {
Ok(DatabaseStateRulesLoaded {
provided_rules: Arc::new(rules),
owner_info: self.owner_info.clone(),
iox_object_store: Arc::clone(&self.iox_object_store),
})
}
@ -1118,6 +1232,7 @@ impl DatabaseStateDatabaseObjectStoreFound {
#[derive(Debug, Clone)]
struct DatabaseStateRulesLoaded {
provided_rules: Arc<ProvidedDatabaseRules>,
owner_info: management::v1::OwnerInfo,
iox_object_store: Arc<IoxObjectStore>,
}
@ -1175,6 +1290,7 @@ impl DatabaseStateRulesLoaded {
lifecycle_worker,
replay_plan: Arc::new(replay_plan),
provided_rules: Arc::clone(&self.provided_rules),
owner_info: self.owner_info.clone(),
})
}
}
@ -1185,6 +1301,7 @@ struct DatabaseStateCatalogLoaded {
lifecycle_worker: Arc<LifecycleWorker>,
replay_plan: Arc<Option<ReplayPlan>>,
provided_rules: Arc<ProvidedDatabaseRules>,
owner_info: management::v1::OwnerInfo,
}
impl DatabaseStateCatalogLoaded {
@ -1232,6 +1349,7 @@ impl DatabaseStateCatalogLoaded {
write_buffer_consumer,
lifecycle_worker: Arc::clone(&self.lifecycle_worker),
provided_rules: Arc::clone(&self.provided_rules),
owner_info: self.owner_info.clone(),
})
}
}
@ -1242,6 +1360,7 @@ pub struct DatabaseStateInitialized {
write_buffer_consumer: Option<Arc<WriteBufferConsumer>>,
lifecycle_worker: Arc<LifecycleWorker>,
provided_rules: Arc<ProvidedDatabaseRules>,
owner_info: management::v1::OwnerInfo,
}
impl DatabaseStateInitialized {

View File

@ -1492,7 +1492,7 @@ mod tests {
}
#[tokio::test]
async fn create_database_persists_rules_and_server_config() {
async fn create_database_persists_rules_owner_and_server_config() {
let application = make_application();
let server = make_server(Arc::clone(&application));
let server_id = ServerId::try_from(1).unwrap();
@ -1539,6 +1539,14 @@ mod tests {
// rules that are being used are the same
assert_eq!(provided_rules.rules(), read_rules.rules());
// assert this database knows it's owned by this server
let owner_info = bananas.owner_info().unwrap();
assert_eq!(owner_info.id, server_id.get_u32());
assert_eq!(
owner_info.location,
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string()
);
// assert server config file exists and has 1 entry
let config = server_config(application.object_store(), server_id).await;
assert_config_contents(&config, &[(&name, String::from("1/bananas/"))]);
@ -2122,6 +2130,7 @@ mod tests {
let foo_db_name = DatabaseName::new("foo").unwrap();
let bar_db_name = DatabaseName::new("bar").unwrap();
let baz_db_name = DatabaseName::new("baz").unwrap();
// create database foo
create_simple_database(&server, "foo")
@ -2142,6 +2151,24 @@ mod tests {
.unwrap();
iox_object_store.get_database_rules_file().await.unwrap();
// create database bar so it gets written to the server config
let baz = create_simple_database(&server, "baz")
.await
.expect("failed to create database");
// make the owner info for baz say it's owned by a different server
let baz_iox_object_store = baz.iox_object_store().unwrap();
let owner_info = management::v1::OwnerInfo {
id: 2,
location: "2/config.pb".to_string(),
};
let mut encoded = bytes::BytesMut::new();
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
.expect("owner info serialization should be valid");
let encoded = encoded.freeze();
baz_iox_object_store.put_owner_file(encoded).await.unwrap();
// restart server
let server = make_server(application);
server.set_id(server_id).unwrap();
@ -2156,11 +2183,12 @@ mod tests {
// DB names contain all DBs
assert_eq!(
server.db_names_sorted(),
vec!["bar".to_string(), "foo".to_string()]
vec!["bar".to_string(), "baz".to_string(), "foo".to_string()]
);
let foo_database = server.database(&foo_db_name).unwrap();
let bar_database = server.database(&bar_db_name).unwrap();
let baz_database = server.database(&baz_db_name).unwrap();
foo_database.wait_for_init().await.unwrap();
assert!(foo_database.init_error().is_none());
@ -2173,6 +2201,12 @@ mod tests {
);
assert!(Arc::ptr_eq(&err, &bar_database.init_error().unwrap()));
let baz_err = baz_database.wait_for_init().await.unwrap_err();
assert_contains!(
baz_err.to_string(),
"Server ID in the database's owner info file (2) does not match this server's ID (1)"
);
// can only write to successfully created DBs
let lines = parsed_lines("cpu foo=1 10");
server

View File

@ -1177,12 +1177,25 @@ async fn test_get_server_status_db_error() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = server_fixture.management_client();
// create malformed DB config
// All databases are owned by server 42
let owner_info = OwnerInfo {
id: 42,
location: "arbitrary".to_string(),
};
let mut owner_info_bytes = bytes::BytesMut::new();
generated_types::server_config::encode_database_owner_info(&owner_info, &mut owner_info_bytes)
.expect("owner info serialization should be valid");
let owner_info_bytes = owner_info_bytes.freeze();
// create valid owner info but malformed DB rules
let mut path = server_fixture.dir().to_path_buf();
path.push("42");
path.push("my_db");
path.push("0");
std::fs::create_dir_all(path.clone()).unwrap();
let mut owner_info_path = path.clone();
owner_info_path.push("owner.pb");
std::fs::write(owner_info_path, &owner_info_bytes).unwrap();
path.push("rules.pb");
std::fs::write(path, "foo").unwrap();
@ -1192,6 +1205,9 @@ async fn test_get_server_status_db_error() {
path.push("soft_deleted");
path.push("0");
std::fs::create_dir_all(path.clone()).unwrap();
let mut owner_info_path = path.clone();
owner_info_path.push("owner.pb");
std::fs::write(owner_info_path, &owner_info_bytes).unwrap();
path.push("DELETED");
std::fs::write(path, "foo").unwrap();
@ -1202,10 +1218,16 @@ async fn test_get_server_status_db_error() {
let mut other_gen_path = path.clone();
path.push("0");
std::fs::create_dir_all(path.clone()).unwrap();
let mut owner_info_path = path.clone();
owner_info_path.push("owner.pb");
std::fs::write(owner_info_path, &owner_info_bytes).unwrap();
path.push("rules.pb");
std::fs::write(path, "foo").unwrap();
other_gen_path.push("1");
std::fs::create_dir_all(other_gen_path.clone()).unwrap();
let mut owner_info_path = other_gen_path.clone();
owner_info_path.push("owner.pb");
std::fs::write(owner_info_path, &owner_info_bytes).unwrap();
other_gen_path.push("rules.pb");
std::fs::write(other_gen_path, "foo").unwrap();