feat: Store only the database rules sent by the client (do not store default values) (#2430)
* feat: add omit_default to protobuf definition * feat: Persist only the client provided rules * fix: Remove race conditions * fix: merge confit * refactor: do not use macro * refactor: restore use of glob import * fix: review comments Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
317a83fc50
commit
779b027271
|
@ -1,5 +1,5 @@
|
|||
/// Simple representation of the state a database can be in.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub enum DatabaseStateCode {
|
||||
/// Database is known but nothing is loaded.
|
||||
Known,
|
||||
|
|
|
@ -17,6 +17,10 @@ service ManagementService {
|
|||
|
||||
rpc ListDatabases(ListDatabasesRequest) returns (ListDatabasesResponse);
|
||||
|
||||
// Return a specific database by name
|
||||
//
|
||||
// Roughly follows the <https://google.aip.dev/131> pattern, except
|
||||
// we wrap the response
|
||||
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse);
|
||||
|
||||
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
|
||||
|
@ -113,7 +117,16 @@ message ListDatabasesResponse {
|
|||
}
|
||||
|
||||
message GetDatabaseRequest {
|
||||
// The name of the database to retrieve
|
||||
string name = 1;
|
||||
|
||||
// If false, or unspecified: return the current configuration that is being used by
|
||||
// the server, with all default values filled in.
|
||||
//
|
||||
// If true, returns only the persisted configuration (aka only
|
||||
// fields which were was supplied when the database was created or
|
||||
// last modified via UpdateDatabase)
|
||||
bool omit_defaults = 2;
|
||||
}
|
||||
|
||||
message GetDatabaseResponse {
|
||||
|
|
|
@ -117,32 +117,32 @@ impl TryFrom<management::RoutingConfig> for RoutingConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/// Wrapper around a `prost` error so that
|
||||
/// users of this crate do not have a direct dependency
|
||||
/// on the prost crate.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DecodeError {
|
||||
#[error("failed to decode protobuf: {0}")]
|
||||
DecodeError(#[from] prost::DecodeError),
|
||||
|
||||
#[error("validation failed: {0}")]
|
||||
ValidationError(#[from] FieldViolation),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum EncodeError {
|
||||
pub enum ProstError {
|
||||
#[error("failed to encode protobuf: {0}")]
|
||||
EncodeError(#[from] prost::EncodeError),
|
||||
|
||||
#[error("failed to decode protobuf: {0}")]
|
||||
DecodeError(#[from] prost::DecodeError),
|
||||
}
|
||||
|
||||
pub fn decode_database_rules(bytes: prost::bytes::Bytes) -> Result<DatabaseRules, DecodeError> {
|
||||
let message: management::DatabaseRules = prost::Message::decode(bytes)?;
|
||||
Ok(message.try_into()?)
|
||||
/// Decode datbase rules that were encoded using `encode_database_rules`
|
||||
pub fn decode_database_rules(
|
||||
bytes: prost::bytes::Bytes,
|
||||
) -> Result<management::DatabaseRules, ProstError> {
|
||||
Ok(prost::Message::decode(bytes)?)
|
||||
}
|
||||
|
||||
/// Encode database rules into a serialized format suitable for
|
||||
/// storage in objet store
|
||||
pub fn encode_database_rules(
|
||||
rules: DatabaseRules,
|
||||
rules: &management::DatabaseRules,
|
||||
bytes: &mut prost::bytes::BytesMut,
|
||||
) -> Result<(), EncodeError> {
|
||||
let encoded: management::DatabaseRules = rules.into();
|
||||
Ok(prost::Message::encode(&encoded, bytes)?)
|
||||
) -> Result<(), ProstError> {
|
||||
Ok(prost::Message::encode(rules, bytes)?)
|
||||
}
|
||||
|
||||
impl From<WriteBufferConnection> for management::database_rules::WriteBufferConnection {
|
||||
|
|
|
@ -514,13 +514,25 @@ impl Client {
|
|||
}
|
||||
|
||||
/// Get database configuration
|
||||
///
|
||||
/// If `omit_defaults` is false, return the current configuration
|
||||
/// that is being used by the server, with all default values
|
||||
/// filled in.
|
||||
///
|
||||
/// If `omit_defaults` is true, returns only the persisted configuration (aka only
|
||||
/// fields which were was supplied when the database was created
|
||||
/// or last modified via UpdateDatabase)
|
||||
pub async fn get_database(
|
||||
&mut self,
|
||||
name: impl Into<String> + Send,
|
||||
omit_defaults: bool,
|
||||
) -> Result<DatabaseRules, GetDatabaseError> {
|
||||
let response = self
|
||||
.inner
|
||||
.get_database(GetDatabaseRequest { name: name.into() })
|
||||
.get_database(GetDatabaseRequest {
|
||||
name: name.into(),
|
||||
omit_defaults,
|
||||
})
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
tonic::Code::NotFound => GetDatabaseError::DatabaseNotFound,
|
||||
|
|
|
@ -3,26 +3,22 @@ use crate::{
|
|||
load::{create_preserved_catalog, load_or_create_preserved_catalog},
|
||||
DatabaseToCommit,
|
||||
},
|
||||
rules::ProvidedDatabaseRules,
|
||||
ApplicationState, Db,
|
||||
};
|
||||
use bytes::BytesMut;
|
||||
use data_types::{
|
||||
database_rules::DatabaseRules, database_state::DatabaseStateCode, server_id::ServerId,
|
||||
DatabaseName,
|
||||
};
|
||||
use data_types::{database_state::DatabaseStateCode, server_id::ServerId, DatabaseName};
|
||||
use futures::{
|
||||
future::{BoxFuture, Shared},
|
||||
FutureExt, TryFutureExt,
|
||||
};
|
||||
use generated_types::database_rules::encode_database_rules;
|
||||
use internal_types::freezable::Freezable;
|
||||
use iox_object_store::IoxObjectStore;
|
||||
use observability_deps::tracing::{error, info, warn};
|
||||
use parking_lot::RwLock;
|
||||
use parquet_file::catalog::api::PreservedCatalog;
|
||||
use persistence_windows::checkpoint::ReplayPlan;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::{future::Future, sync::Arc, time::Duration};
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::{future::Future, ops::DerefMut, sync::Arc, time::Duration};
|
||||
use tokio::{sync::Notify, task::JoinError};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
|
@ -67,10 +63,19 @@ pub enum Error {
|
|||
db_name: String,
|
||||
source: Box<InitError>,
|
||||
},
|
||||
|
||||
#[snafu(display("cannot update database rules for {} in state {}", db_name, state))]
|
||||
RulesNotUpdateable {
|
||||
db_name: String,
|
||||
state: DatabaseStateCode,
|
||||
},
|
||||
|
||||
#[snafu(display("cannot persisted updated rules: {}", source))]
|
||||
CannotPersistUpdatedRules { source: crate::rules::Error },
|
||||
}
|
||||
|
||||
/// A `Database` represents a single configured IOx database - i.e. an entity with a corresponding
|
||||
/// set of `DatabaseRules`.
|
||||
/// A `Database` represents a single configured IOx database - i.e. an
|
||||
/// entity with a corresponding set of `DatabaseRules`.
|
||||
///
|
||||
/// `Database` composes together the various subsystems responsible for implementing
|
||||
/// `DatabaseRules` and handles their startup and shutdown. This includes instance-local
|
||||
|
@ -87,6 +92,8 @@ pub struct Database {
|
|||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
/// Informatation about where a database is located on object store,
|
||||
/// and how to perform startup activities.
|
||||
pub struct DatabaseConfig {
|
||||
pub name: DatabaseName<'static>,
|
||||
pub server_id: ServerId,
|
||||
|
@ -119,20 +126,23 @@ impl Database {
|
|||
Self { join, shared }
|
||||
}
|
||||
|
||||
/// Create fresh database w/o any state.
|
||||
/// Create fresh database without any any state.
|
||||
pub async fn create(
|
||||
application: Arc<ApplicationState>,
|
||||
rules: DatabaseRules,
|
||||
provided_rules: &ProvidedDatabaseRules,
|
||||
server_id: ServerId,
|
||||
) -> Result<(), InitError> {
|
||||
let db_name = rules.name.clone();
|
||||
let db_name = provided_rules.db_name();
|
||||
let iox_object_store = Arc::new(
|
||||
IoxObjectStore::new(Arc::clone(application.object_store()), server_id, &db_name)
|
||||
IoxObjectStore::new(Arc::clone(application.object_store()), server_id, db_name)
|
||||
.await
|
||||
.context(IoxObjectStoreError)?,
|
||||
);
|
||||
|
||||
persist_database_rules(&iox_object_store, rules).await?;
|
||||
provided_rules
|
||||
.persist(&iox_object_store)
|
||||
.await
|
||||
.context(SavingRules)?;
|
||||
|
||||
create_preserved_catalog(
|
||||
db_name.as_str(),
|
||||
|
@ -179,8 +189,78 @@ impl Database {
|
|||
}
|
||||
|
||||
/// Returns the database rules if they're loaded
|
||||
pub fn rules(&self) -> Option<Arc<DatabaseRules>> {
|
||||
self.shared.state.read().rules()
|
||||
pub fn provided_rules(&self) -> Option<Arc<ProvidedDatabaseRules>> {
|
||||
self.shared.state.read().provided_rules()
|
||||
}
|
||||
|
||||
/// Update the database rules, panic'ing if the state is invalid
|
||||
pub async fn update_provided_rules(
|
||||
&self,
|
||||
new_provided_rules: Arc<ProvidedDatabaseRules>,
|
||||
) -> Result<(), Error> {
|
||||
// get a handle to signal our intention to update the state
|
||||
let (handle, iox_object_store) = {
|
||||
// scope so we drop the read lock
|
||||
let state = self.shared.state.read();
|
||||
let state_code = state.state_code();
|
||||
|
||||
// A handle to the object store so we can update the rules
|
||||
// in object store prior to obtaining exclusive write
|
||||
// acces to the `DatabaseState`
|
||||
let iox_object_store = state.iox_object_store().context(RulesNotUpdateable {
|
||||
db_name: new_provided_rules.db_name(),
|
||||
state: state_code,
|
||||
})?;
|
||||
|
||||
// ensure the database is in initialized state (acquiring
|
||||
// the freeze handle ensures this state remains the same
|
||||
// once we get the write lock below
|
||||
ensure!(
|
||||
state_code == DatabaseStateCode::Initialized,
|
||||
RulesNotUpdateable {
|
||||
db_name: new_provided_rules.db_name(),
|
||||
state: state_code,
|
||||
}
|
||||
);
|
||||
|
||||
let handle = state.try_freeze().context(RulesNotUpdateable {
|
||||
db_name: new_provided_rules.db_name(),
|
||||
state: state_code,
|
||||
})?;
|
||||
(handle, iox_object_store)
|
||||
};
|
||||
|
||||
// Attempt to persist to object store, if that fails, roll
|
||||
// back the whole transaction (leave the rules unchanged).
|
||||
//
|
||||
// The fact the freeze handle is held here ensures only one
|
||||
// one task can ever be in this code at any time. Another
|
||||
// concurrent attempt would error in the `try_freeze()`
|
||||
// call above.
|
||||
new_provided_rules
|
||||
.persist(&iox_object_store)
|
||||
.await
|
||||
.context(CannotPersistUpdatedRules)?;
|
||||
|
||||
let mut state = self.shared.state.write();
|
||||
|
||||
// Exchange FreezeHandle for mutable access via WriteGuard,
|
||||
// nothing else can mess with the database state now as we
|
||||
// change the rules
|
||||
let mut state = state.unfreeze(handle);
|
||||
|
||||
if let DatabaseState::Initialized(DatabaseStateInitialized { db, provided_rules }) =
|
||||
state.deref_mut()
|
||||
{
|
||||
db.update_rules(Arc::clone(new_provided_rules.rules()));
|
||||
*provided_rules = new_provided_rules;
|
||||
Ok(())
|
||||
} else {
|
||||
// The fact we have a handle should have prevented any
|
||||
// changes to the database state between when it was
|
||||
// checked above and now
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the IoxObjectStore if it has been found
|
||||
|
@ -338,7 +418,9 @@ struct DatabaseShared {
|
|||
/// Application-global state
|
||||
application: Arc<ApplicationState>,
|
||||
|
||||
/// The state of the `Database`
|
||||
/// The state of the `Database`, wrapped in a `Freezable` to
|
||||
/// ensure there is only one task with an outstanding intent to
|
||||
/// write at any time.
|
||||
state: RwLock<Freezable<DatabaseState>>,
|
||||
|
||||
/// Notify that the database state has changed
|
||||
|
@ -515,14 +597,6 @@ pub enum InitError {
|
|||
#[snafu(display("no active generation directory found, not loading"))]
|
||||
NoActiveDatabase,
|
||||
|
||||
#[snafu(display("error fetching rules: {}", source))]
|
||||
RulesFetch { source: object_store::Error },
|
||||
|
||||
#[snafu(display("error decoding database rules: {}", source))]
|
||||
RulesDecode {
|
||||
source: generated_types::database_rules::DecodeError,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Database names in deserialized rules ({}) does not match expected value ({})",
|
||||
actual,
|
||||
|
@ -541,19 +615,17 @@ pub enum InitError {
|
|||
#[snafu(display("error during replay: {}", source))]
|
||||
Replay { source: crate::db::Error },
|
||||
|
||||
#[snafu(display("store error: {}", source))]
|
||||
StoreError { source: object_store::Error },
|
||||
#[snafu(display("error saving database rules: {}", source))]
|
||||
SavingRules { source: crate::rules::Error },
|
||||
|
||||
#[snafu(display("error loading database rules: {}", source))]
|
||||
LoadingRules { source: crate::rules::Error },
|
||||
|
||||
#[snafu(display("{}", source))]
|
||||
IoxObjectStoreError {
|
||||
source: iox_object_store::IoxObjectStoreError,
|
||||
},
|
||||
|
||||
#[snafu(display("error serializing database rules to protobuf: {}", source))]
|
||||
ErrorSerializingRulesProtobuf {
|
||||
source: generated_types::database_rules::EncodeError,
|
||||
},
|
||||
|
||||
#[snafu(display("cannot create preserved catalog: {}", source))]
|
||||
CannotCreatePreservedCatalog { source: crate::db::load::Error },
|
||||
}
|
||||
|
@ -618,7 +690,7 @@ impl DatabaseState {
|
|||
}
|
||||
}
|
||||
|
||||
fn rules(&self) -> Option<Arc<DatabaseRules>> {
|
||||
fn provided_rules(&self) -> Option<Arc<ProvidedDatabaseRules>> {
|
||||
match self {
|
||||
DatabaseState::Known(_)
|
||||
| DatabaseState::DatabaseObjectStoreFound(_)
|
||||
|
@ -626,12 +698,12 @@ impl DatabaseState {
|
|||
| DatabaseState::NoActiveDatabase(_, _)
|
||||
| DatabaseState::RulesLoadError(_, _) => None,
|
||||
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
|
||||
Some(Arc::clone(&state.rules))
|
||||
Some(Arc::clone(&state.provided_rules))
|
||||
}
|
||||
DatabaseState::CatalogLoaded(state) | DatabaseState::ReplayError(state, _) => {
|
||||
Some(state.db.rules())
|
||||
Some(Arc::clone(&state.provided_rules))
|
||||
}
|
||||
DatabaseState::Initialized(state) => Some(state.db.rules()),
|
||||
DatabaseState::Initialized(state) => Some(Arc::clone(&state.provided_rules)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -697,25 +769,19 @@ impl DatabaseStateDatabaseObjectStoreFound {
|
|||
&self,
|
||||
shared: &DatabaseShared,
|
||||
) -> Result<DatabaseStateRulesLoaded, InitError> {
|
||||
// TODO: Retry this
|
||||
let bytes = self
|
||||
.iox_object_store
|
||||
.get_database_rules_file()
|
||||
let rules = ProvidedDatabaseRules::load(&self.iox_object_store)
|
||||
.await
|
||||
.context(RulesFetch)?;
|
||||
.context(LoadingRules)?;
|
||||
|
||||
let rules =
|
||||
generated_types::database_rules::decode_database_rules(bytes).context(RulesDecode)?;
|
||||
|
||||
if rules.name != shared.config.name {
|
||||
if rules.db_name() != &shared.config.name {
|
||||
return Err(InitError::RulesDatabaseNameMismatch {
|
||||
actual: rules.name.to_string(),
|
||||
actual: rules.db_name().to_string(),
|
||||
expected: shared.config.name.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(DatabaseStateRulesLoaded {
|
||||
rules: Arc::new(rules),
|
||||
provided_rules: Arc::new(rules),
|
||||
iox_object_store: Arc::clone(&self.iox_object_store),
|
||||
})
|
||||
}
|
||||
|
@ -723,7 +789,7 @@ impl DatabaseStateDatabaseObjectStoreFound {
|
|||
|
||||
#[derive(Debug, Clone)]
|
||||
struct DatabaseStateRulesLoaded {
|
||||
rules: Arc<DatabaseRules>,
|
||||
provided_rules: Arc<ProvidedDatabaseRules>,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
}
|
||||
|
||||
|
@ -747,7 +813,10 @@ impl DatabaseStateRulesLoaded {
|
|||
let write_buffer = shared
|
||||
.application
|
||||
.write_buffer_factory()
|
||||
.new_config(shared.config.server_id, self.rules.as_ref())
|
||||
.new_config(
|
||||
shared.config.server_id,
|
||||
self.provided_rules.rules().as_ref(),
|
||||
)
|
||||
.await
|
||||
.context(CreateWriteBuffer)?;
|
||||
|
||||
|
@ -755,7 +824,7 @@ impl DatabaseStateRulesLoaded {
|
|||
server_id: shared.config.server_id,
|
||||
iox_object_store: Arc::clone(&self.iox_object_store),
|
||||
exec: Arc::clone(shared.application.executor()),
|
||||
rules: Arc::clone(&self.rules),
|
||||
rules: Arc::clone(self.provided_rules.rules()),
|
||||
preserved_catalog,
|
||||
catalog,
|
||||
write_buffer,
|
||||
|
@ -769,6 +838,7 @@ impl DatabaseStateRulesLoaded {
|
|||
Ok(DatabaseStateCatalogLoaded {
|
||||
db,
|
||||
replay_plan: Arc::new(replay_plan),
|
||||
provided_rules: Arc::clone(&self.provided_rules),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -777,6 +847,7 @@ impl DatabaseStateRulesLoaded {
|
|||
struct DatabaseStateCatalogLoaded {
|
||||
db: Arc<Db>,
|
||||
replay_plan: Arc<Option<ReplayPlan>>,
|
||||
provided_rules: Arc<ProvidedDatabaseRules>,
|
||||
}
|
||||
|
||||
impl DatabaseStateCatalogLoaded {
|
||||
|
@ -792,28 +863,17 @@ impl DatabaseStateCatalogLoaded {
|
|||
db.unsuppress_persistence().await;
|
||||
db.allow_write_buffer_read();
|
||||
|
||||
Ok(DatabaseStateInitialized { db })
|
||||
Ok(DatabaseStateInitialized {
|
||||
db,
|
||||
provided_rules: Arc::clone(&self.provided_rules),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct DatabaseStateInitialized {
|
||||
db: Arc<Db>,
|
||||
}
|
||||
|
||||
/// Persist the the `DatabaseRules` given the database object storage
|
||||
pub(super) async fn persist_database_rules(
|
||||
object_store: &IoxObjectStore,
|
||||
rules: DatabaseRules,
|
||||
) -> Result<(), InitError> {
|
||||
let mut data = BytesMut::new();
|
||||
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
|
||||
|
||||
object_store
|
||||
.put_database_rules_file(data.freeze())
|
||||
.await
|
||||
.context(StoreError)?;
|
||||
Ok(())
|
||||
provided_rules: Arc<ProvidedDatabaseRules>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -825,7 +885,11 @@ mod tests {
|
|||
use write_buffer::{config::WriteBufferConfigFactory, mock::MockBufferSharedState};
|
||||
|
||||
use super::*;
|
||||
use std::{convert::TryFrom, num::NonZeroU32, time::Instant};
|
||||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
num::NonZeroU32,
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn database_shutdown_waits_for_jobs() {
|
||||
|
@ -922,7 +986,7 @@ mod tests {
|
|||
|
||||
// setup DB
|
||||
let db_name = DatabaseName::new("test_db").unwrap();
|
||||
let rules = DatabaseRules {
|
||||
let rules = data_types::database_rules::DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
partition_template: partition_template.clone(),
|
||||
lifecycle_rules: data_types::database_rules::LifecycleRules {
|
||||
|
@ -935,9 +999,13 @@ mod tests {
|
|||
"mock://my_mock".to_string(),
|
||||
)),
|
||||
};
|
||||
Database::create(Arc::clone(&application), rules, server_id)
|
||||
.await
|
||||
.unwrap();
|
||||
Database::create(
|
||||
Arc::clone(&application),
|
||||
&make_provided_rules(rules),
|
||||
server_id,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let db_config = DatabaseConfig {
|
||||
name: db_name,
|
||||
server_id,
|
||||
|
@ -1031,4 +1099,17 @@ mod tests {
|
|||
database.shutdown();
|
||||
database.join().await.unwrap();
|
||||
}
|
||||
|
||||
/// Normally database rules are provided as grpc messages, but in
|
||||
/// tests they are constructed from database rules structures
|
||||
/// themselves.
|
||||
fn make_provided_rules(
|
||||
rules: data_types::database_rules::DatabaseRules,
|
||||
) -> ProvidedDatabaseRules {
|
||||
let rules: generated_types::influxdata::iox::management::v1::DatabaseRules =
|
||||
rules.try_into().unwrap();
|
||||
|
||||
let provided_rules: ProvidedDatabaseRules = rules.try_into().unwrap();
|
||||
provided_rules
|
||||
}
|
||||
}
|
||||
|
|
|
@ -489,19 +489,15 @@ impl Db {
|
|||
}
|
||||
|
||||
/// Updates the database rules
|
||||
pub fn update_rules<F, E>(&self, update: F) -> Result<Arc<DatabaseRules>, E>
|
||||
where
|
||||
F: FnOnce(DatabaseRules) -> Result<DatabaseRules, E>,
|
||||
{
|
||||
let (late_arrive_window_updated, new_rules) = {
|
||||
pub fn update_rules(&self, new_rules: Arc<DatabaseRules>) {
|
||||
let late_arrive_window_updated = {
|
||||
let mut rules = self.rules.write();
|
||||
info!(db_name=%rules.name, "updating rules for database");
|
||||
let new_rules = Arc::new(update(rules.as_ref().clone())?);
|
||||
let late_arrive_window_updated = rules.lifecycle_rules.late_arrive_window_seconds
|
||||
!= new_rules.lifecycle_rules.late_arrive_window_seconds;
|
||||
|
||||
*rules = Arc::clone(&new_rules);
|
||||
(late_arrive_window_updated, new_rules)
|
||||
late_arrive_window_updated
|
||||
};
|
||||
|
||||
if late_arrive_window_updated {
|
||||
|
@ -522,8 +518,6 @@ impl Db {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(new_rules)
|
||||
}
|
||||
|
||||
/// Return the current database's object storage
|
||||
|
|
|
@ -70,13 +70,13 @@
|
|||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
database_rules::{DatabaseRules, NodeGroup, RoutingRules, ShardConfig, ShardId, Sink},
|
||||
database_rules::{NodeGroup, RoutingRules, ShardConfig, ShardId, Sink},
|
||||
error::ErrorLogger,
|
||||
job::Job,
|
||||
server_id::ServerId,
|
||||
{DatabaseName, DatabaseNameError},
|
||||
};
|
||||
use database::{persist_database_rules, Database, DatabaseConfig};
|
||||
use database::{Database, DatabaseConfig};
|
||||
use entry::{lines_to_sharded_entries, pb_to_entry, Entry, ShardedEntry};
|
||||
use futures::future::{BoxFuture, Future, FutureExt, Shared, TryFutureExt};
|
||||
use generated_types::influxdata::pbdata::v1 as pb;
|
||||
|
@ -90,6 +90,7 @@ use observability_deps::tracing::{error, info, warn};
|
|||
use parking_lot::RwLock;
|
||||
use rand::seq::SliceRandom;
|
||||
use resolver::Resolver;
|
||||
use rules::ProvidedDatabaseRules;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
use tokio::{sync::Notify, task::JoinError};
|
||||
|
@ -109,6 +110,7 @@ pub mod db;
|
|||
mod job;
|
||||
mod resolver;
|
||||
|
||||
pub mod rules;
|
||||
/// Utility modules used by benchmarks and tests
|
||||
pub mod utils;
|
||||
|
||||
|
@ -130,8 +132,11 @@ pub enum Error {
|
|||
#[snafu(display("database not initialized"))]
|
||||
DatabaseNotInitialized { db_name: String },
|
||||
|
||||
#[snafu(display("cannot persisted updated rules: {}", source))]
|
||||
CannotPersistUpdatedRules { source: crate::database::InitError },
|
||||
#[snafu(display("cannot update database rules"))]
|
||||
CanNotUpdateRules {
|
||||
db_name: String,
|
||||
source: crate::database::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("cannot create database: {}", source))]
|
||||
CannotCreateDatabase { source: crate::database::InitError },
|
||||
|
@ -516,18 +521,6 @@ impl ServerStateInitialized {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum UpdateError<E> {
|
||||
Update(Error),
|
||||
Closure(E),
|
||||
}
|
||||
|
||||
impl<E> From<Error> for UpdateError<E> {
|
||||
fn from(e: Error) -> Self {
|
||||
Self::Update(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> Server<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync,
|
||||
|
@ -675,8 +668,11 @@ where
|
|||
/// Tells the server the set of rules for a database.
|
||||
///
|
||||
/// Waits until the database has initialized or failed to do so
|
||||
pub async fn create_database(&self, rules: DatabaseRules) -> Result<Arc<Database>> {
|
||||
let db_name = rules.name.clone();
|
||||
pub async fn create_database(
|
||||
&self,
|
||||
provided_rules: ProvidedDatabaseRules,
|
||||
) -> Result<Arc<Database>> {
|
||||
let db_name = provided_rules.db_name();
|
||||
|
||||
// Wait for exclusive access to mutate server state
|
||||
let handle_fut = self.shared.state.read().freeze();
|
||||
|
@ -686,7 +682,7 @@ where
|
|||
let state = self.shared.state.read();
|
||||
let initialized = state.initialized()?;
|
||||
|
||||
if let Some(existing) = initialized.databases.get(&rules.name) {
|
||||
if let Some(existing) = initialized.databases.get(db_name) {
|
||||
if let Some(init_error) = existing.init_error() {
|
||||
if !matches!(&*init_error, database::InitError::NoActiveDatabase) {
|
||||
return Err(Error::DatabaseAlreadyExists {
|
||||
|
@ -702,9 +698,13 @@ where
|
|||
initialized.server_id
|
||||
};
|
||||
|
||||
Database::create(Arc::clone(&self.shared.application), rules, server_id)
|
||||
.await
|
||||
.context(CannotCreateDatabase)?;
|
||||
Database::create(
|
||||
Arc::clone(&self.shared.application),
|
||||
&provided_rules,
|
||||
server_id,
|
||||
)
|
||||
.await
|
||||
.context(CannotCreateDatabase)?;
|
||||
|
||||
let database = {
|
||||
let mut state = self.shared.state.write();
|
||||
|
@ -717,7 +717,7 @@ where
|
|||
.new_database(
|
||||
&self.shared,
|
||||
DatabaseConfig {
|
||||
name: db_name,
|
||||
name: db_name.clone(),
|
||||
server_id,
|
||||
wipe_catalog_on_error: false,
|
||||
skip_replay: false,
|
||||
|
@ -952,29 +952,22 @@ where
|
|||
}
|
||||
|
||||
/// Update database rules and save on success.
|
||||
pub async fn update_db_rules<F, E>(
|
||||
pub async fn update_db_rules(
|
||||
&self,
|
||||
db_name: &DatabaseName<'_>,
|
||||
update: F,
|
||||
) -> std::result::Result<Arc<DatabaseRules>, UpdateError<E>>
|
||||
where
|
||||
F: FnOnce(DatabaseRules) -> Result<DatabaseRules, E> + Send,
|
||||
{
|
||||
// TODO: Move into Database (#2053)
|
||||
provided_rules: ProvidedDatabaseRules,
|
||||
) -> Result<Arc<ProvidedDatabaseRules>> {
|
||||
let provided_rules = Arc::new(provided_rules);
|
||||
|
||||
let database = self.database(db_name)?;
|
||||
let db = database
|
||||
.initialized_db()
|
||||
.ok_or(Error::DatabaseNotInitialized {
|
||||
db_name: db_name.to_string(),
|
||||
})?;
|
||||
|
||||
let rules = db.update_rules(update).map_err(UpdateError::Closure)?;
|
||||
|
||||
// TODO: Handle failure
|
||||
persist_database_rules(&db.iox_object_store(), rules.as_ref().clone())
|
||||
// attempt to save provided rules in the current state
|
||||
database
|
||||
.update_provided_rules(Arc::clone(&provided_rules))
|
||||
.await
|
||||
.context(CannotPersistUpdatedRules)?;
|
||||
Ok(rules)
|
||||
.context(CanNotUpdateRules { db_name })?;
|
||||
|
||||
Ok(provided_rules)
|
||||
}
|
||||
|
||||
pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> {
|
||||
|
@ -1212,7 +1205,7 @@ where
|
|||
let db = match self.db(&db_name) {
|
||||
Ok(db) => db,
|
||||
Err(Error::DatabaseNotFound { .. }) => {
|
||||
self.create_database(DatabaseRules::new(db_name.clone()))
|
||||
self.create_database(ProvidedDatabaseRules::new_empty(db_name.clone()))
|
||||
.await?;
|
||||
self.db(&db_name).expect("db not inserted")
|
||||
}
|
||||
|
@ -1233,12 +1226,11 @@ mod tests {
|
|||
use data_types::{
|
||||
chunk_metadata::ChunkAddr,
|
||||
database_rules::{
|
||||
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart,
|
||||
DatabaseRules, HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart,
|
||||
WriteBufferConnection, NO_SHARD_CONFIG,
|
||||
},
|
||||
};
|
||||
use entry::test_helpers::lp_to_entry;
|
||||
use generated_types::database_rules::decode_database_rules;
|
||||
use influxdb_line_protocol::parse_lines;
|
||||
use iox_object_store::IoxObjectStore;
|
||||
use metrics::TestMetricRegistry;
|
||||
|
@ -1246,7 +1238,7 @@ mod tests {
|
|||
use parquet_file::catalog::{api::PreservedCatalog, test_helpers::TestCatalogState};
|
||||
use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner, QueryDatabase};
|
||||
use std::{
|
||||
convert::{Infallible, TryFrom},
|
||||
convert::{TryFrom, TryInto},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
|
@ -1311,26 +1303,31 @@ mod tests {
|
|||
worker_cleanup_avg_sleep: Duration::from_secs(2),
|
||||
write_buffer_connection: None,
|
||||
};
|
||||
let provided_rules = make_provided_rules(rules);
|
||||
|
||||
// Create a database
|
||||
let bananas = server
|
||||
.create_database(rules.clone())
|
||||
.create_database(provided_rules.clone())
|
||||
.await
|
||||
.expect("failed to create database");
|
||||
|
||||
let read_data = bananas
|
||||
.iox_object_store()
|
||||
.unwrap()
|
||||
.get_database_rules_file()
|
||||
let iox_object_store = bananas.iox_object_store().unwrap();
|
||||
let read_rules = ProvidedDatabaseRules::load(&iox_object_store)
|
||||
.await
|
||||
.unwrap();
|
||||
let read_rules = decode_database_rules(read_data).unwrap();
|
||||
|
||||
assert_eq!(rules, read_rules);
|
||||
// Same rules that were provided are read
|
||||
assert_eq!(provided_rules.original(), read_rules.original());
|
||||
|
||||
// rules that are being used are the same
|
||||
assert_eq!(provided_rules.rules(), read_rules.rules());
|
||||
|
||||
let db2 = DatabaseName::new("db_awesome").unwrap();
|
||||
let rules2 = DatabaseRules::new(db2.clone());
|
||||
|
||||
let provided_rules2 = make_provided_rules(rules2);
|
||||
server
|
||||
.create_database(DatabaseRules::new(db2.clone()))
|
||||
.create_database(provided_rules2)
|
||||
.await
|
||||
.expect("failed to create 2nd db");
|
||||
|
||||
|
@ -1360,13 +1357,13 @@ mod tests {
|
|||
|
||||
// Create a database
|
||||
server
|
||||
.create_database(DatabaseRules::new(name.clone()))
|
||||
.create_database(default_rules(name.clone()))
|
||||
.await
|
||||
.expect("failed to create database");
|
||||
|
||||
// Then try and create another with the same name
|
||||
let got = server
|
||||
.create_database(DatabaseRules::new(name.clone()))
|
||||
.create_database(default_rules(name.clone()))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
|
@ -1396,7 +1393,7 @@ mod tests {
|
|||
};
|
||||
|
||||
// Create a database
|
||||
server.create_database(rules.clone()).await
|
||||
server.create_database(make_provided_rules(rules)).await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1447,10 +1444,7 @@ mod tests {
|
|||
let err = bananas_database.wait_for_init().await.unwrap_err();
|
||||
|
||||
assert!(apples_database.init_error().is_none());
|
||||
assert!(matches!(
|
||||
err.as_ref(),
|
||||
database::InitError::RulesFetch { .. }
|
||||
));
|
||||
assert_contains!(err.to_string(), "error fetching rules");
|
||||
assert!(Arc::ptr_eq(&err, &bananas_database.init_error().unwrap()));
|
||||
}
|
||||
|
||||
|
@ -1465,7 +1459,7 @@ mod tests {
|
|||
for name in &names {
|
||||
let name = DatabaseName::new(name.to_string()).unwrap();
|
||||
server
|
||||
.create_database(DatabaseRules::new(name))
|
||||
.create_database(default_rules(name))
|
||||
.await
|
||||
.expect("failed to create database");
|
||||
}
|
||||
|
@ -1482,7 +1476,7 @@ mod tests {
|
|||
|
||||
let db_name = DatabaseName::new("foo".to_string()).unwrap();
|
||||
server
|
||||
.create_database(DatabaseRules::new(db_name.clone()))
|
||||
.create_database(default_rules(db_name.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1517,7 +1511,7 @@ mod tests {
|
|||
|
||||
let name = DatabaseName::new("foo".to_string()).unwrap();
|
||||
server
|
||||
.create_database(DatabaseRules::new(name.clone()))
|
||||
.create_database(default_rules(name.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1599,29 +1593,31 @@ mod tests {
|
|||
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
server
|
||||
.create_database(DatabaseRules::new(db_name.clone()))
|
||||
.create_database(default_rules(db_name.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let remote_ids = vec![bad_remote_id, good_remote_id_1, good_remote_id_2];
|
||||
let db = server.db(&db_name).unwrap();
|
||||
db.update_rules(|mut rules| {
|
||||
let shard_config = ShardConfig {
|
||||
hash_ring: Some(HashRing {
|
||||
shards: vec![TEST_SHARD_ID].into(),
|
||||
..Default::default()
|
||||
}),
|
||||
shards: Arc::new(
|
||||
vec![(TEST_SHARD_ID, Sink::Iox(remote_ids.clone()))]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
),
|
||||
|
||||
let shard_config = ShardConfig {
|
||||
hash_ring: Some(HashRing {
|
||||
shards: vec![TEST_SHARD_ID].into(),
|
||||
..Default::default()
|
||||
};
|
||||
rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config));
|
||||
Ok::<_, Infallible>(rules)
|
||||
})
|
||||
.unwrap();
|
||||
}),
|
||||
shards: Arc::new(
|
||||
vec![(TEST_SHARD_ID, Sink::Iox(remote_ids.clone()))]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut rules = db.rules().as_ref().clone();
|
||||
rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config));
|
||||
let rules = Arc::new(rules);
|
||||
|
||||
db.update_rules(rules);
|
||||
|
||||
let line = "cpu bar=1 10";
|
||||
let lines: Vec<_> = parse_lines(line).map(|l| l.unwrap()).collect();
|
||||
|
@ -1677,7 +1673,7 @@ mod tests {
|
|||
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
server
|
||||
.create_database(DatabaseRules::new(db_name.clone()))
|
||||
.create_database(default_rules(db_name.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1764,26 +1760,30 @@ mod tests {
|
|||
|
||||
let name = DatabaseName::new("foo".to_string()).unwrap();
|
||||
server
|
||||
.create_database(DatabaseRules::new(name.clone()))
|
||||
.create_database(default_rules(name.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
let db = server.db(&db_name).unwrap();
|
||||
let rules = db
|
||||
.update_rules(|mut rules| {
|
||||
rules.lifecycle_rules.buffer_size_hard =
|
||||
Some(std::num::NonZeroUsize::new(10).unwrap());
|
||||
Ok::<_, Infallible>(rules)
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let mut rules: DatabaseRules = db.rules().as_ref().clone();
|
||||
|
||||
rules.lifecycle_rules.buffer_size_hard = Some(std::num::NonZeroUsize::new(10).unwrap());
|
||||
|
||||
let rules = Arc::new(rules);
|
||||
db.update_rules(Arc::clone(&rules));
|
||||
|
||||
// inserting first line does not trigger hard buffer limit
|
||||
let line_1 = "cpu bar=1 10";
|
||||
let lines_1: Vec<_> = parse_lines(line_1).map(|l| l.unwrap()).collect();
|
||||
let sharded_entries_1 =
|
||||
lines_to_sharded_entries(&lines_1, ARBITRARY_DEFAULT_TIME, NO_SHARD_CONFIG, &*rules)
|
||||
.expect("first sharded entries");
|
||||
let sharded_entries_1 = lines_to_sharded_entries(
|
||||
&lines_1,
|
||||
ARBITRARY_DEFAULT_TIME,
|
||||
NO_SHARD_CONFIG,
|
||||
rules.as_ref(),
|
||||
)
|
||||
.expect("first sharded entries");
|
||||
|
||||
let entry_1 = &sharded_entries_1[0].entry;
|
||||
server
|
||||
|
@ -1913,10 +1913,11 @@ mod tests {
|
|||
assert!(foo_database.init_error().is_none());
|
||||
|
||||
let err = bar_database.wait_for_init().await.unwrap_err();
|
||||
assert!(matches!(
|
||||
err.as_ref(),
|
||||
database::InitError::RulesDecode { .. }
|
||||
));
|
||||
assert_contains!(err.to_string(), "error deserializing database rules");
|
||||
assert_contains!(
|
||||
err.to_string(),
|
||||
"failed to decode Protobuf message: invalid varint"
|
||||
);
|
||||
assert!(Arc::ptr_eq(&err, &bar_database.init_error().unwrap()));
|
||||
|
||||
// can only write to successfully created DBs
|
||||
|
@ -2092,10 +2093,7 @@ mod tests {
|
|||
))
|
||||
} else if name == &db_name_rules_broken {
|
||||
let err = database.wait_for_init().await.unwrap_err();
|
||||
assert!(matches!(
|
||||
err.as_ref(),
|
||||
database::InitError::RulesDecode { .. }
|
||||
))
|
||||
assert_contains!(err.to_string(), "error deserializing database rules");
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
|
@ -2190,7 +2188,7 @@ mod tests {
|
|||
|
||||
// 5. cannot wipe if DB was just created
|
||||
let created = server
|
||||
.create_database(DatabaseRules::new(db_name_created.clone()))
|
||||
.create_database(default_rules(db_name_created.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -2261,7 +2259,10 @@ mod tests {
|
|||
"mock://my_mock".to_string(),
|
||||
)),
|
||||
};
|
||||
server.create_database(rules.clone()).await.unwrap();
|
||||
server
|
||||
.create_database(make_provided_rules(rules))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let entry = lp_to_entry("cpu bar=1 10");
|
||||
|
||||
|
@ -2281,4 +2282,19 @@ mod tests {
|
|||
let physical_plan = planner.query(query, &ctx).unwrap();
|
||||
ctx.collect(physical_plan).await.unwrap()
|
||||
}
|
||||
|
||||
fn default_rules(db_name: DatabaseName<'static>) -> ProvidedDatabaseRules {
|
||||
make_provided_rules(DatabaseRules::new(db_name))
|
||||
}
|
||||
|
||||
/// Normally database rules are provided as grpc messages, but in
|
||||
/// tests they are constructed from database rules structures
|
||||
/// themselves.
|
||||
fn make_provided_rules(rules: DatabaseRules) -> ProvidedDatabaseRules {
|
||||
let rules: generated_types::influxdata::iox::management::v1::DatabaseRules =
|
||||
rules.try_into().unwrap();
|
||||
|
||||
let provided_rules: ProvidedDatabaseRules = rules.try_into().unwrap();
|
||||
provided_rules
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
use std::{convert::TryInto, sync::Arc};
|
||||
|
||||
use data_types::{database_rules::DatabaseRules, DatabaseName};
|
||||
use generated_types::{
|
||||
database_rules::encode_database_rules, google::FieldViolation, influxdata::iox::management,
|
||||
};
|
||||
use iox_object_store::IoxObjectStore;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Error saving rules for {}: {}", db_name, source))]
|
||||
ObjectStore {
|
||||
db_name: String,
|
||||
source: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("error deserializing database rules: {}", source))]
|
||||
Deserialization {
|
||||
source: generated_types::database_rules::ProstError,
|
||||
},
|
||||
|
||||
#[snafu(display("error serializing database rules: {}", source))]
|
||||
Serialization {
|
||||
source: generated_types::database_rules::ProstError,
|
||||
},
|
||||
|
||||
#[snafu(display("error fetching rules: {}", source))]
|
||||
RulesFetch { source: object_store::Error },
|
||||
|
||||
#[snafu(display("error converting grpc to database rules: {}", source))]
|
||||
ConvertingRules { source: FieldViolation },
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// The configuration ([`DatabaseRules`]) used to create and update
|
||||
/// databases, both in original and "materialized" (with defaults filled in) form.
|
||||
///
|
||||
/// The rationale for storing both the rules as they were provided
|
||||
/// *and* materialized form is provide the property that if the same
|
||||
/// rules are sent to a database that were previously sent the
|
||||
/// database will still be runing the same configuration. If the
|
||||
/// materialized configuration was stored, and then the defaults were
|
||||
/// changed in a new version of the software, the required property
|
||||
/// would not hold.
|
||||
///
|
||||
/// While this may sound like an esoteric corner case with little real
|
||||
/// world impact, it has non trivial real world implications for
|
||||
/// keeping the configurations of fleets of IOx servers in sync. See
|
||||
/// <https://github.com/influxdata/influxdb_iox/issues/2409> for
|
||||
/// further gory details.
|
||||
///
|
||||
/// A design goal is to keep the notion of "what user provided" as
|
||||
/// isolated as much as possible so only the server crate worries
|
||||
/// about what the user actually provided and the rest of the system
|
||||
/// can use `internal_types::database_rules::DatabaseRules` in
|
||||
/// blissful ignorance of such subtlties
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProvidedDatabaseRules {
|
||||
/// Full database rules, with all fields set. Derived from
|
||||
/// `encoded` by applying default values.
|
||||
rules: Arc<DatabaseRules>,
|
||||
|
||||
/// Encoded database rules, as provided by the user and as stored
|
||||
/// in the object store (may not have all fields set).
|
||||
original: management::v1::DatabaseRules,
|
||||
}
|
||||
|
||||
impl ProvidedDatabaseRules {
|
||||
// Create a new database with a default database
|
||||
pub fn new_empty(db_name: DatabaseName<'static>) -> Self {
|
||||
let original = management::v1::DatabaseRules {
|
||||
name: db_name.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Should always be able to create a DBRules with default values
|
||||
original.try_into().expect("creating empty rules")
|
||||
}
|
||||
|
||||
/// returns the name of the database in the rules
|
||||
pub fn db_name(&self) -> &DatabaseName<'static> {
|
||||
&self.rules.name
|
||||
}
|
||||
|
||||
/// Return the full database rules
|
||||
pub fn rules(&self) -> &Arc<DatabaseRules> {
|
||||
&self.rules
|
||||
}
|
||||
|
||||
/// Return the original rules provided to this
|
||||
pub fn original(&self) -> &management::v1::DatabaseRules {
|
||||
&self.original
|
||||
}
|
||||
|
||||
/// Load `ProvidedDatabaseRules` from object storage
|
||||
pub async fn load(iox_object_store: &IoxObjectStore) -> Result<Self> {
|
||||
// TODO: Retry this
|
||||
let bytes = iox_object_store
|
||||
.get_database_rules_file()
|
||||
.await
|
||||
.context(RulesFetch)?;
|
||||
|
||||
let new_self = generated_types::database_rules::decode_database_rules(bytes)
|
||||
.context(Deserialization)?
|
||||
.try_into()
|
||||
.context(ConvertingRules)?;
|
||||
|
||||
Ok(new_self)
|
||||
}
|
||||
|
||||
/// Persist the the `ProvidedDatabaseRules` given the database object storage
|
||||
pub async fn persist(&self, iox_object_store: &IoxObjectStore) -> Result<()> {
|
||||
// Note we save the original version
|
||||
let mut data = bytes::BytesMut::new();
|
||||
encode_database_rules(&self.original, &mut data).context(Serialization)?;
|
||||
|
||||
iox_object_store
|
||||
.put_database_rules_file(data.freeze())
|
||||
.await
|
||||
.context(ObjectStore {
|
||||
db_name: &self.rules.name,
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl TryInto<ProvidedDatabaseRules> for management::v1::DatabaseRules {
|
||||
type Error = FieldViolation;
|
||||
|
||||
/// Create a new ProvidedDatabaseRules from a grpc message
|
||||
fn try_into(self) -> Result<ProvidedDatabaseRules, Self::Error> {
|
||||
let original = self.clone();
|
||||
let rules: DatabaseRules = self.try_into()?;
|
||||
let rules = Arc::new(rules);
|
||||
|
||||
Ok(ProvidedDatabaseRules { rules, original })
|
||||
}
|
||||
}
|
|
@ -128,6 +128,12 @@ struct List {}
|
|||
struct Get {
|
||||
/// The name of the database
|
||||
name: String,
|
||||
|
||||
/// If false, returns values for all fields, with defaults filled
|
||||
/// in. If true, only returns values which were explicitly set on
|
||||
/// database creation or update
|
||||
#[structopt(long)]
|
||||
omit_defaults: bool,
|
||||
}
|
||||
|
||||
/// Write data into the specified database
|
||||
|
@ -214,8 +220,12 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
println!("{}", databases.join("\n"))
|
||||
}
|
||||
Command::Get(get) => {
|
||||
let Get {
|
||||
name,
|
||||
omit_defaults,
|
||||
} = get;
|
||||
let mut client = management::Client::new(connection);
|
||||
let database = client.get_database(get.name).await?;
|
||||
let database = client.get_database(name, omit_defaults).await?;
|
||||
println!("{}", serde_json::to_string_pretty(&database)?);
|
||||
}
|
||||
Command::Write(write) => {
|
||||
|
|
|
@ -353,6 +353,7 @@ mod tests {
|
|||
use ::http::{header::HeaderName, HeaderValue};
|
||||
use data_types::{database_rules::DatabaseRules, DatabaseName};
|
||||
use influxdb_iox_client::connection::Connection;
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use std::convert::TryInto;
|
||||
use std::num::NonZeroU64;
|
||||
use structopt::StructOpt;
|
||||
|
@ -492,7 +493,7 @@ mod tests {
|
|||
// Create a database that won't panic
|
||||
let other_db_name = DatabaseName::new("other").unwrap();
|
||||
server
|
||||
.create_database(DatabaseRules::new(other_db_name.clone()))
|
||||
.create_database(make_rules(&other_db_name))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -517,7 +518,7 @@ mod tests {
|
|||
|
||||
// Create database that will panic in its worker loop
|
||||
server
|
||||
.create_database(DatabaseRules::new(DatabaseName::new("panic_test").unwrap()))
|
||||
.create_database(make_rules("panic_test"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -760,4 +761,15 @@ mod tests {
|
|||
assert_eq!(span.span_context.trace_id().to_u128(), 0x34f8495);
|
||||
assert_eq!(span.parent_span_id.to_u64(), 0x30e34);
|
||||
}
|
||||
|
||||
fn make_rules(db_name: impl Into<String>) -> ProvidedDatabaseRules {
|
||||
let db_name = DatabaseName::new(db_name.into()).unwrap();
|
||||
|
||||
let rules = DatabaseRules::new(db_name);
|
||||
let rules: generated_types::influxdata::iox::management::v1::DatabaseRules =
|
||||
rules.try_into().unwrap();
|
||||
|
||||
let provided_rules: ProvidedDatabaseRules = rules.try_into().unwrap();
|
||||
provided_rules
|
||||
}
|
||||
}
|
||||
|
|
|
@ -964,7 +964,7 @@ where
|
|||
mod tests {
|
||||
use super::*;
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
convert::{TryFrom, TryInto},
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
};
|
||||
|
||||
|
@ -976,7 +976,7 @@ mod tests {
|
|||
use metrics::TestMetricRegistry;
|
||||
use object_store::ObjectStore;
|
||||
use serde::de::DeserializeOwned;
|
||||
use server::{db::Db, ApplicationState, ConnectionManagerImpl};
|
||||
use server::{db::Db, rules::ProvidedDatabaseRules, ApplicationState, ConnectionManagerImpl};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
fn make_application() -> Arc<ApplicationState> {
|
||||
|
@ -1041,9 +1041,7 @@ mod tests {
|
|||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.wait_for_init().await.unwrap();
|
||||
app_server
|
||||
.create_database(DatabaseRules::new(
|
||||
DatabaseName::new("MyOrg_MyBucket").unwrap(),
|
||||
))
|
||||
.create_database(make_rules("MyOrg_MyBucket"))
|
||||
.await
|
||||
.unwrap();
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
@ -1091,9 +1089,7 @@ mod tests {
|
|||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.wait_for_init().await.unwrap();
|
||||
app_server
|
||||
.create_database(DatabaseRules::new(
|
||||
DatabaseName::new("MetricsOrg_MetricsBucket").unwrap(),
|
||||
))
|
||||
.create_database(make_rules("MetricsOrg_MetricsBucket"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -1182,9 +1178,7 @@ mod tests {
|
|||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.wait_for_init().await.unwrap();
|
||||
app_server
|
||||
.create_database(DatabaseRules::new(
|
||||
DatabaseName::new("MyOrg_MyBucket").unwrap(),
|
||||
))
|
||||
.create_database(make_rules("MyOrg_MyBucket"))
|
||||
.await
|
||||
.unwrap();
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
@ -1509,13 +1503,22 @@ mod tests {
|
|||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.wait_for_init().await.unwrap();
|
||||
app_server
|
||||
.create_database(DatabaseRules::new(
|
||||
DatabaseName::new("MyOrg_MyBucket").unwrap(),
|
||||
))
|
||||
.create_database(make_rules("MyOrg_MyBucket"))
|
||||
.await
|
||||
.unwrap();
|
||||
let server_url = test_server(application, Arc::clone(&app_server));
|
||||
|
||||
(app_server, server_url)
|
||||
}
|
||||
|
||||
fn make_rules(db_name: impl Into<String>) -> ProvidedDatabaseRules {
|
||||
let db_name = DatabaseName::new(db_name.into()).unwrap();
|
||||
|
||||
let rules = DatabaseRules::new(db_name);
|
||||
let rules: generated_types::influxdata::iox::management::v1::DatabaseRules =
|
||||
rules.try_into().unwrap();
|
||||
|
||||
let provided_rules: ProvidedDatabaseRules = rules.try_into().unwrap();
|
||||
provided_rules
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,6 +94,10 @@ pub fn default_database_error_handler(error: server::database::Error) -> tonic::
|
|||
error!(%source, "Unexpected error while wiping catalog");
|
||||
InternalError {}.into()
|
||||
}
|
||||
Error::RulesNotUpdateable { .. } => tonic::Status::failed_precondition(error.to_string()),
|
||||
Error::CannotPersistUpdatedRules { .. } => {
|
||||
tonic::Status::failed_precondition(error.to_string())
|
||||
}
|
||||
Error::SkipReplay { source, .. } => {
|
||||
error!(%source, "Unexpected error skipping replay");
|
||||
InternalError {}.into()
|
||||
|
|
|
@ -3,13 +3,11 @@ use std::fmt::Debug;
|
|||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
|
||||
use generated_types::google::{
|
||||
AlreadyExists, FieldViolation, FieldViolationExt, FromFieldOpt, InternalError, NotFound,
|
||||
};
|
||||
use data_types::{server_id::ServerId, DatabaseName};
|
||||
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
|
||||
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
|
||||
use observability_deps::tracing::info;
|
||||
use query::QueryDatabase;
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use server::{ApplicationState, ConnectionManager, DatabaseStore, Error, Server};
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
|
@ -24,33 +22,6 @@ use super::error::{
|
|||
};
|
||||
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum UpdateError {
|
||||
Update(server::Error),
|
||||
Closure(tonic::Status),
|
||||
}
|
||||
|
||||
impl From<UpdateError> for Status {
|
||||
fn from(error: UpdateError) -> Self {
|
||||
match error {
|
||||
UpdateError::Update(error) => {
|
||||
info!(?error, "Update error");
|
||||
InternalError {}.into()
|
||||
}
|
||||
UpdateError::Closure(error) => error,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<server::UpdateError<Status>> for UpdateError {
|
||||
fn from(error: server::UpdateError<Status>) -> Self {
|
||||
match error {
|
||||
server::UpdateError::Update(error) => Self::Update(error),
|
||||
server::UpdateError::Closure(error) => Self::Closure(error),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl<M> management_service_server::ManagementService for ManagementService<M>
|
||||
where
|
||||
|
@ -98,16 +69,29 @@ where
|
|||
&self,
|
||||
request: Request<GetDatabaseRequest>,
|
||||
) -> Result<Response<GetDatabaseResponse>, Status> {
|
||||
let name = DatabaseName::new(request.into_inner().name).field("name")?;
|
||||
let GetDatabaseRequest {
|
||||
name,
|
||||
omit_defaults,
|
||||
} = request.into_inner();
|
||||
|
||||
let name = DatabaseName::new(name).field("name")?;
|
||||
let database = self
|
||||
.server
|
||||
.database(&name)
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
match database.rules() {
|
||||
Some(rules) => Ok(Response::new(GetDatabaseResponse {
|
||||
rules: Some(rules.as_ref().clone().into()),
|
||||
})),
|
||||
match database.provided_rules() {
|
||||
Some(provided_rules) => {
|
||||
let rules: DatabaseRules = if omit_defaults {
|
||||
// return rules as originally provided by the user
|
||||
provided_rules.original().clone()
|
||||
} else {
|
||||
// return the active rules (which have all default values filled in)
|
||||
provided_rules.rules().as_ref().clone().into()
|
||||
};
|
||||
|
||||
Ok(Response::new(GetDatabaseResponse { rules: Some(rules) }))
|
||||
}
|
||||
None => {
|
||||
return Err(tonic::Status::unavailable(format!(
|
||||
"Rules have not yet been loaded for database ({})",
|
||||
|
@ -124,11 +108,13 @@ where
|
|||
let rules: DatabaseRules = request
|
||||
.into_inner()
|
||||
.rules
|
||||
.ok_or_else(|| FieldViolation::required(""))
|
||||
.and_then(TryInto::try_into)
|
||||
.map_err(|e| e.scope("rules"))?;
|
||||
.ok_or_else(|| FieldViolation::required("rules"))?;
|
||||
|
||||
match self.server.create_database(rules).await {
|
||||
let provided_rules: ProvidedDatabaseRules = rules
|
||||
.try_into()
|
||||
.map_err(|e: FieldViolation| e.scope("rules"))?;
|
||||
|
||||
match self.server.create_database(provided_rules).await {
|
||||
Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
|
||||
Err(Error::DatabaseAlreadyExists { db_name }) => {
|
||||
return Err(AlreadyExists {
|
||||
|
@ -146,16 +132,24 @@ where
|
|||
&self,
|
||||
request: Request<UpdateDatabaseRequest>,
|
||||
) -> Result<Response<UpdateDatabaseResponse>, Status> {
|
||||
let request = request.into_inner();
|
||||
let rules: DatabaseRules = request.rules.required("rules")?;
|
||||
let db_name = rules.name.clone();
|
||||
let rules: DatabaseRules = request
|
||||
.into_inner()
|
||||
.rules
|
||||
.ok_or_else(|| FieldViolation::required("rules"))?;
|
||||
|
||||
let provided_rules: ProvidedDatabaseRules = rules
|
||||
.try_into()
|
||||
.map_err(|e: FieldViolation| e.scope("rules"))?;
|
||||
|
||||
let db_name = provided_rules.db_name().clone();
|
||||
let updated_rules = self
|
||||
.server
|
||||
.update_db_rules(&db_name, |_orig| Ok(rules))
|
||||
.update_db_rules(&db_name, provided_rules)
|
||||
.await
|
||||
.map_err(UpdateError::from)?;
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
Ok(Response::new(UpdateDatabaseResponse {
|
||||
rules: Some(updated_rules.as_ref().clone().into()),
|
||||
rules: Some(updated_rules.rules().as_ref().clone().into()),
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,11 @@ use generated_types::{
|
|||
database_rules::RoutingRules, database_status::DatabaseState, *,
|
||||
},
|
||||
};
|
||||
use influxdb_iox_client::{management::CreateDatabaseError, operations, write::WriteError};
|
||||
use influxdb_iox_client::{
|
||||
management::{Client, CreateDatabaseError},
|
||||
operations,
|
||||
write::WriteError,
|
||||
};
|
||||
|
||||
use test_helpers::assert_contains;
|
||||
|
||||
|
@ -244,7 +248,7 @@ async fn test_create_get_update_database() {
|
|||
.expect("create database failed");
|
||||
|
||||
let response = client
|
||||
.get_database(&db_name)
|
||||
.get_database(&db_name, false)
|
||||
.await
|
||||
.expect("get database failed");
|
||||
|
||||
|
@ -262,7 +266,7 @@ async fn test_create_get_update_database() {
|
|||
assert_eq!(updated_rules, rules);
|
||||
|
||||
let response = client
|
||||
.get_database(&db_name)
|
||||
.get_database(&db_name, false)
|
||||
.await
|
||||
.expect("get database failed");
|
||||
|
||||
|
@ -272,6 +276,81 @@ async fn test_create_get_update_database() {
|
|||
));
|
||||
}
|
||||
|
||||
/// gets configuration both with and without defaults, and verifies
|
||||
/// that the worker_cleanup_avg_sleep field is the same and that
|
||||
/// lifecycle_rules are not present except when defaults are filled in
|
||||
async fn assert_rule_defaults(client: &mut Client, db_name: &str, provided_rules: &DatabaseRules) {
|
||||
assert!(provided_rules.lifecycle_rules.is_none());
|
||||
|
||||
// Get the configuration, but do not get any defaults
|
||||
// No lifecycle rules should be present
|
||||
let response = client
|
||||
.get_database(db_name, true)
|
||||
.await
|
||||
.expect("get database failed");
|
||||
assert!(response.lifecycle_rules.is_none());
|
||||
assert_eq!(
|
||||
provided_rules.worker_cleanup_avg_sleep,
|
||||
response.worker_cleanup_avg_sleep
|
||||
);
|
||||
|
||||
// Get the configuration, *with* defaults, and the lifecycle rules should be present
|
||||
let response = client
|
||||
.get_database(db_name, false) // with defaults
|
||||
.await
|
||||
.expect("get database failed");
|
||||
assert!(response.lifecycle_rules.is_some());
|
||||
assert_eq!(
|
||||
provided_rules.worker_cleanup_avg_sleep,
|
||||
response.worker_cleanup_avg_sleep
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_get_update_database_omit_defaults() {
|
||||
// Test to ensure that the database remembers only the
|
||||
// configuration that it was sent, not including the default
|
||||
// values
|
||||
let server_fixture = ServerFixture::create_shared().await;
|
||||
let mut client = server_fixture.management_client();
|
||||
|
||||
let db_name = rand_name();
|
||||
|
||||
// Only set the worker cleanup rules.
|
||||
let mut rules = DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
worker_cleanup_avg_sleep: Some(Duration {
|
||||
seconds: 2,
|
||||
nanos: 0,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
client
|
||||
.create_database(rules.clone())
|
||||
.await
|
||||
.expect("create database failed");
|
||||
|
||||
assert_rule_defaults(&mut client, &db_name, &rules).await;
|
||||
|
||||
// Now, modify the worker to cleanup rules
|
||||
rules.worker_cleanup_avg_sleep = Some(Duration {
|
||||
seconds: 20,
|
||||
nanos: 0,
|
||||
});
|
||||
let updated_rules = client
|
||||
.update_database(rules.clone())
|
||||
.await
|
||||
.expect("update database failed");
|
||||
assert!(updated_rules.lifecycle_rules.is_some());
|
||||
assert_eq!(
|
||||
rules.worker_cleanup_avg_sleep,
|
||||
updated_rules.worker_cleanup_avg_sleep
|
||||
);
|
||||
|
||||
assert_rule_defaults(&mut client, &db_name, &rules).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_chunk_get() {
|
||||
use generated_types::influxdata::iox::management::v1::{
|
||||
|
@ -1002,7 +1081,7 @@ async fn test_get_server_status_db_error() {
|
|||
let db_status = &status.database_statuses[0];
|
||||
assert_eq!(db_status.db_name, "my_db");
|
||||
assert!(dbg!(&db_status.error.as_ref().unwrap().message)
|
||||
.starts_with("error decoding database rules:"));
|
||||
.contains("error deserializing database rules"));
|
||||
assert_eq!(
|
||||
DatabaseState::from_i32(db_status.state).unwrap(),
|
||||
DatabaseState::DatabaseObjectStoreFound
|
||||
|
|
|
@ -171,7 +171,7 @@ async fn test_update_late_arrival() {
|
|||
influxdb_iox_client::management::generated_types::ChunkStorage::OpenMutableBuffer as i32
|
||||
);
|
||||
|
||||
let mut rules = management.get_database(&db_name).await.unwrap();
|
||||
let mut rules = management.get_database(&db_name, false).await.unwrap();
|
||||
rules
|
||||
.lifecycle_rules
|
||||
.as_mut()
|
||||
|
@ -211,7 +211,10 @@ async fn test_query_chunk_after_restart() {
|
|||
create_readable_database(&db_name, fixture.grpc_channel()).await;
|
||||
|
||||
// enable persistence prior to write
|
||||
let mut rules = management_client.get_database(&db_name).await.unwrap();
|
||||
let mut rules = management_client
|
||||
.get_database(&db_name, false)
|
||||
.await
|
||||
.unwrap();
|
||||
rules.lifecycle_rules = Some({
|
||||
let mut lifecycle_rules = rules.lifecycle_rules.unwrap();
|
||||
lifecycle_rules.persist = true;
|
||||
|
|
|
@ -207,7 +207,7 @@ async fn test_write_routed() {
|
|||
|
||||
// Set sharding rules on the router:
|
||||
let mut router_db_rules = router_mgmt
|
||||
.get_database(&db_name)
|
||||
.get_database(&db_name, false)
|
||||
.await
|
||||
.expect("cannot get database on router");
|
||||
let shard_config = ShardConfig {
|
||||
|
@ -395,7 +395,7 @@ async fn test_write_routed_errors() {
|
|||
|
||||
// Set sharding rules on the router:
|
||||
let mut router_db_rules = router_mgmt
|
||||
.get_database(&db_name)
|
||||
.get_database(&db_name, false)
|
||||
.await
|
||||
.expect("cannot get database on router");
|
||||
let shard_config = ShardConfig {
|
||||
|
@ -464,7 +464,7 @@ async fn test_write_dev_null() {
|
|||
|
||||
// Set sharding rules on the router:
|
||||
let mut router_db_rules = router_mgmt
|
||||
.get_database(&db_name)
|
||||
.get_database(&db_name, false)
|
||||
.await
|
||||
.expect("cannot get database on router");
|
||||
let shard_config = ShardConfig {
|
||||
|
@ -588,7 +588,7 @@ async fn test_write_routed_no_shard() {
|
|||
(db_name_2.clone(), TEST_REMOTE_ID_2),
|
||||
] {
|
||||
let mut router_db_rules = router_mgmt
|
||||
.get_database(db_name)
|
||||
.get_database(db_name, false)
|
||||
.await
|
||||
.expect("cannot get database on router");
|
||||
let routing_config = RoutingConfig {
|
||||
|
|
Loading…
Reference in New Issue