feat: add transactions for context to database owner info on disown
parent
fb7bde527f
commit
7e13cb77ac
|
@ -1,6 +1,8 @@
|
|||
syntax = "proto3";
|
||||
package influxdata.iox.management.v1;
|
||||
option go_package = "github.com/influxdata/iox/management/v1";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "google/protobuf/any.proto";
|
||||
|
||||
// Stores a server's map of the databases it owns. The keys are the database names and the values
|
||||
// are the database's location in object storage.
|
||||
|
@ -14,10 +16,32 @@ message ServerConfig {
|
|||
|
||||
// Stores information about a server that owns a database. To be stored in a database's object
|
||||
// store directory as verification of ownership.
|
||||
//
|
||||
// Like the rules.pb file, this is a configuration file about a database. Unlike the rules.pb file,
|
||||
// this file cannot be directly updated by the user and contains more low level information about
|
||||
// the database that must be manipulated through specific IOx API calls.
|
||||
message OwnerInfo {
|
||||
// The ID of the server that owns this database
|
||||
uint32 id = 1;
|
||||
|
||||
// The path to this server's config file in object storage
|
||||
string location = 2;
|
||||
|
||||
// Recent history of disown/adopt transactions (truncated at 100 transactions)
|
||||
repeated OwnershipTransaction transactions = 3;
|
||||
}
|
||||
|
||||
message OwnershipTransaction {
|
||||
// Copy of the ID field at the time of the transaction.
|
||||
uint32 id = 1;
|
||||
|
||||
// Copy of the path to this server's config file in object storage at the time of the
|
||||
// transaction.
|
||||
string location = 2;
|
||||
|
||||
// When the transaction took place
|
||||
google.protobuf.Timestamp timestamp = 3;
|
||||
|
||||
// Context provided in the disown/adopt calls
|
||||
repeated google.protobuf.Any context = 4;
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ option go_package = "github.com/influxdata/iox/management/v1";
|
|||
|
||||
import "google/longrunning/operations.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "google/protobuf/any.proto";
|
||||
import "influxdata/iox/management/v1/database_rules.proto";
|
||||
import "influxdata/iox/management/v1/chunk.proto";
|
||||
import "influxdata/iox/management/v1/partition.proto";
|
||||
|
@ -221,8 +222,10 @@ message DisownDatabaseRequest {
|
|||
// the optional UUID of the database that must match
|
||||
bytes uuid = 2;
|
||||
|
||||
// the optional context to record with the history of this disown operation
|
||||
string context = 3;
|
||||
// optional context to annotate the disown event in the disowned database's owner info file.
|
||||
// The context objects are qualified by protobuf message type.
|
||||
// The user/orchestration system can provide their own messages.
|
||||
repeated google.protobuf.Any context = 3;
|
||||
}
|
||||
|
||||
message DisownDatabaseResponse {
|
||||
|
|
|
@ -202,11 +202,6 @@ where
|
|||
} else {
|
||||
Some(Uuid::from_slice(&uuid).field("uuid")?)
|
||||
};
|
||||
let context = if context.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(context)
|
||||
};
|
||||
|
||||
let returned_uuid = self
|
||||
.server
|
||||
|
|
|
@ -1324,6 +1324,7 @@ async fn test_get_server_status_db_error() {
|
|||
let owner_info = OwnerInfo {
|
||||
id: 42,
|
||||
location: "arbitrary".to_string(),
|
||||
transactions: vec![],
|
||||
};
|
||||
let mut owner_info_bytes = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(&owner_info, &mut owner_info_bytes)
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use self::generated_types::{management_service_client::ManagementServiceClient, *};
|
||||
use crate::{
|
||||
connection::Connection,
|
||||
google::{longrunning::IoxOperation, FieldViolation},
|
||||
google::{self, longrunning::IoxOperation, FieldViolation},
|
||||
protobuf_type_url,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use std::{convert::TryInto, num::NonZeroU32};
|
||||
|
@ -740,7 +741,10 @@ impl Client {
|
|||
.map(|s| s.parse::<Uuid>().map(|u| u.as_bytes().to_vec()))
|
||||
.transpose()?
|
||||
.unwrap_or_default(),
|
||||
context: context.unwrap_or_default(),
|
||||
context: vec![google::protobuf::Any {
|
||||
type_url: protobuf_type_url("google.protobuf.StringValue"),
|
||||
value: context.unwrap_or_default().into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
|
|
|
@ -14,7 +14,7 @@ use futures::{
|
|||
FutureExt, TryFutureExt,
|
||||
};
|
||||
use generated_types::{
|
||||
database_state::DatabaseState as DatabaseStateCode, influxdata::iox::management,
|
||||
database_state::DatabaseState as DatabaseStateCode, google, influxdata::iox::management,
|
||||
};
|
||||
use internal_types::freezable::Freezable;
|
||||
use iox_object_store::IoxObjectStore;
|
||||
|
@ -25,6 +25,7 @@ use parquet_catalog::core::PreservedCatalog;
|
|||
use persistence_windows::checkpoint::ReplayPlan;
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::{future::Future, sync::Arc, time::Duration};
|
||||
use time::Time;
|
||||
use tokio::{sync::Notify, task::JoinError};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use uuid::Uuid;
|
||||
|
@ -273,7 +274,11 @@ impl Database {
|
|||
}
|
||||
|
||||
/// Disown this database from this server.
|
||||
pub async fn disown(&self, context: Option<String>) -> Result<Uuid, Error> {
|
||||
pub async fn disown(
|
||||
&self,
|
||||
context: Vec<google::protobuf::Any>,
|
||||
timestamp: Time,
|
||||
) -> Result<Uuid, Error> {
|
||||
let db_name = &self.shared.config.name;
|
||||
|
||||
let handle = self.shared.state.read().freeze();
|
||||
|
@ -301,7 +306,7 @@ impl Database {
|
|||
}
|
||||
})?;
|
||||
|
||||
update_owner_info(None, None, &iox_object_store)
|
||||
update_owner_info(None, None, context, timestamp, &iox_object_store)
|
||||
.await
|
||||
.context(CannotDisown { db_name })?;
|
||||
|
||||
|
@ -345,9 +350,16 @@ impl Database {
|
|||
let server_location =
|
||||
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
|
||||
|
||||
update_owner_info(Some(server_id), Some(server_location), &iox_object_store)
|
||||
.await
|
||||
.context(UpdatingOwnerInfo)?;
|
||||
let timestamp = Time::from_timestamp(295293, 3);
|
||||
update_owner_info(
|
||||
Some(server_id),
|
||||
Some(server_location),
|
||||
vec![],
|
||||
timestamp,
|
||||
&iox_object_store,
|
||||
)
|
||||
.await
|
||||
.context(UpdatingOwnerInfo)?;
|
||||
|
||||
Ok(database_location)
|
||||
}
|
||||
|
@ -1255,6 +1267,7 @@ async fn create_owner_info(
|
|||
let owner_info = management::v1::OwnerInfo {
|
||||
id: server_id.get_u32(),
|
||||
location: server_location,
|
||||
transactions: vec![],
|
||||
};
|
||||
let mut encoded = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
|
||||
|
@ -1284,19 +1297,38 @@ pub enum OwnerInfoUpdateError {
|
|||
async fn update_owner_info(
|
||||
new_server_id: Option<ServerId>,
|
||||
new_server_location: Option<String>,
|
||||
context: Vec<google::protobuf::Any>,
|
||||
timestamp: Time,
|
||||
iox_object_store: &IoxObjectStore,
|
||||
) -> Result<(), OwnerInfoUpdateError> {
|
||||
let mut owner_info = fetch_owner_info(iox_object_store)
|
||||
let management::v1::OwnerInfo {
|
||||
id,
|
||||
location,
|
||||
mut transactions,
|
||||
} = fetch_owner_info(iox_object_store)
|
||||
.await
|
||||
.context(CouldNotFetch)?;
|
||||
|
||||
// 0 is not a valid server ID, so it indicates "unowned".
|
||||
owner_info.id = new_server_id.map(|s| s.get_u32()).unwrap_or_default();
|
||||
// Owner location is empty when the database is unowned.
|
||||
owner_info.location = new_server_location.unwrap_or_default();
|
||||
let new_transaction = management::v1::OwnershipTransaction {
|
||||
id,
|
||||
location,
|
||||
timestamp: Some(timestamp.date_time().into()),
|
||||
context,
|
||||
};
|
||||
transactions.push(new_transaction);
|
||||
|
||||
// TODO: only save latest 100 transactions
|
||||
|
||||
let new_owner_info = management::v1::OwnerInfo {
|
||||
// 0 is not a valid server ID, so it indicates "unowned".
|
||||
id: new_server_id.map(|s| s.get_u32()).unwrap_or_default(),
|
||||
// Owner location is empty when the database is unowned.
|
||||
location: new_server_location.unwrap_or_default(),
|
||||
transactions,
|
||||
};
|
||||
|
||||
let mut encoded = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
|
||||
generated_types::server_config::encode_database_owner_info(&new_owner_info, &mut encoded)
|
||||
.expect("owner info serialization should be valid");
|
||||
let encoded = encoded.freeze();
|
||||
|
||||
|
@ -1492,11 +1524,12 @@ mod tests {
|
|||
use crate::test_utils::make_application;
|
||||
|
||||
use super::*;
|
||||
use data_types::sequence::Sequence;
|
||||
use data_types::{
|
||||
database_rules::{PartitionTemplate, TemplatePart},
|
||||
sequence::Sequence,
|
||||
write_buffer::{WriteBufferConnection, WriteBufferDirection},
|
||||
};
|
||||
use generated_types::protobuf_type_url;
|
||||
use std::{num::NonZeroU32, time::Instant};
|
||||
use uuid::Uuid;
|
||||
use write_buffer::mock::MockBufferSharedState;
|
||||
|
@ -1666,10 +1699,14 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn database_disown() {
|
||||
let (_application, database) = initialized_database().await;
|
||||
let (application, database) = initialized_database().await;
|
||||
let server_id = database.shared.config.server_id;
|
||||
let server_location =
|
||||
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
|
||||
let iox_object_store = database.iox_object_store().unwrap();
|
||||
let timestamp = Time::from_timestamp(295293, 3);
|
||||
|
||||
database.disown(None).await.unwrap();
|
||||
database.disown(vec![], timestamp).await.unwrap();
|
||||
|
||||
assert_eq!(database.state_code(), DatabaseStateCode::NoActiveDatabase);
|
||||
assert!(matches!(
|
||||
|
@ -1680,15 +1717,35 @@ mod tests {
|
|||
let owner_info = fetch_owner_info(&iox_object_store).await.unwrap();
|
||||
assert_eq!(owner_info.id, 0);
|
||||
assert_eq!(owner_info.location, "");
|
||||
assert_eq!(owner_info.transactions.len(), 1);
|
||||
|
||||
let transaction = &owner_info.transactions[0];
|
||||
assert_eq!(transaction.id, server_id.get_u32());
|
||||
assert_eq!(transaction.location, server_location);
|
||||
let expected_timestamp: google::protobuf::Timestamp = timestamp.date_time().into();
|
||||
assert_eq!(transaction.timestamp, Some(expected_timestamp));
|
||||
assert!(transaction.context.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn database_disown_with_context() {
|
||||
let (_application, database) = initialized_database().await;
|
||||
let (application, database) = initialized_database().await;
|
||||
let server_id = database.shared.config.server_id;
|
||||
let server_location =
|
||||
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
|
||||
let iox_object_store = database.iox_object_store().unwrap();
|
||||
let timestamp = Time::from_timestamp(295293, 3);
|
||||
|
||||
let comment = "I bequeath this database unto the universe";
|
||||
|
||||
database
|
||||
.disown(Some("I don't like this database anymore".into()))
|
||||
.disown(
|
||||
vec![google::protobuf::Any {
|
||||
type_url: protobuf_type_url("google.protobuf.StringValue"),
|
||||
value: comment.into(),
|
||||
}],
|
||||
timestamp,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1701,6 +1758,15 @@ mod tests {
|
|||
let owner_info = fetch_owner_info(&iox_object_store).await.unwrap();
|
||||
assert_eq!(owner_info.id, 0);
|
||||
assert_eq!(owner_info.location, "");
|
||||
let transaction = &owner_info.transactions[0];
|
||||
assert_eq!(transaction.id, server_id.get_u32());
|
||||
assert_eq!(transaction.location, server_location);
|
||||
let expected_timestamp: google::protobuf::Timestamp = timestamp.date_time().into();
|
||||
assert_eq!(transaction.timestamp, Some(expected_timestamp));
|
||||
assert_eq!(transaction.context.len(), 1);
|
||||
|
||||
let context = transaction.context[0].value.slice(..);
|
||||
assert_eq!(std::str::from_utf8(&context).unwrap(), comment);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -82,7 +82,10 @@ use data_types::{
|
|||
};
|
||||
use database::{Database, DatabaseConfig};
|
||||
use futures::future::{BoxFuture, Future, FutureExt, Shared, TryFutureExt};
|
||||
use generated_types::{google::FieldViolation, influxdata::iox::management};
|
||||
use generated_types::{
|
||||
google::{self, FieldViolation},
|
||||
influxdata::iox::management,
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use internal_types::freezable::Freezable;
|
||||
use iox_object_store::IoxObjectStore;
|
||||
|
@ -766,7 +769,7 @@ where
|
|||
&self,
|
||||
db_name: &DatabaseName<'static>,
|
||||
uuid: Option<Uuid>,
|
||||
context: Option<String>,
|
||||
context: Vec<google::protobuf::Any>,
|
||||
) -> Result<Uuid> {
|
||||
// Wait for exclusive access to mutate server state
|
||||
let handle_fut = self.shared.state.read().freeze();
|
||||
|
@ -788,8 +791,10 @@ where
|
|||
.fail();
|
||||
}
|
||||
|
||||
let timestamp = self.shared.application.time_provider().now();
|
||||
|
||||
let returned_uuid = database
|
||||
.disown(context)
|
||||
.disown(context, timestamp)
|
||||
.await
|
||||
.context(CannotDisownDatabase)?;
|
||||
|
||||
|
@ -1801,6 +1806,7 @@ mod tests {
|
|||
let owner_info = management::v1::OwnerInfo {
|
||||
id: server_id.get_u32(),
|
||||
location: IoxObjectStore::server_config_path(object_store, server_id).to_string(),
|
||||
transactions: vec![],
|
||||
};
|
||||
let mut encoded_owner_info = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(
|
||||
|
@ -2259,6 +2265,7 @@ mod tests {
|
|||
let owner_info = management::v1::OwnerInfo {
|
||||
id: 2,
|
||||
location: "2/config.pb".to_string(),
|
||||
transactions: vec![],
|
||||
};
|
||||
let mut encoded = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
|
||||
|
@ -2451,7 +2458,7 @@ mod tests {
|
|||
|
||||
// disown database by name
|
||||
let disowned_uuid = server
|
||||
.disown_database(&foo_db_name, None, None)
|
||||
.disown_database(&foo_db_name, None, vec![])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(first_foo_uuid, disowned_uuid);
|
||||
|
@ -2472,14 +2479,14 @@ mod tests {
|
|||
let incorrect_uuid = Uuid::new_v4();
|
||||
assert_error!(
|
||||
server
|
||||
.disown_database(&foo_db_name, Some(incorrect_uuid), None)
|
||||
.disown_database(&foo_db_name, Some(incorrect_uuid), vec![])
|
||||
.await,
|
||||
Error::UuidMismatch { .. }
|
||||
);
|
||||
|
||||
// disown database specifying UUID works if UUID *does* match
|
||||
server
|
||||
.disown_database(&foo_db_name, Some(second_foo_uuid), None)
|
||||
.disown_database(&foo_db_name, Some(second_foo_uuid), vec![])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -2497,7 +2504,7 @@ mod tests {
|
|||
server.wait_for_init().await.unwrap();
|
||||
|
||||
assert_error!(
|
||||
server.disown_database(&foo_db_name, None, None).await,
|
||||
server.disown_database(&foo_db_name, None, vec![]).await,
|
||||
Error::DatabaseNotFound { .. },
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue