diff --git a/Cargo.lock b/Cargo.lock index e9c72693dc..5564c2f7d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3874,7 +3874,6 @@ dependencies = [ "serde_json", "snafu", "snap", - "tempfile", "test_helpers", "tikv-jemalloc-ctl", "tokio", diff --git a/server/Cargo.toml b/server/Cargo.toml index c331be47bf..8caab9bd17 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -45,7 +45,6 @@ serde = "1.0" serde_json = "1.0" snafu = "0.6" snap = "1.0.0" -tempfile = "3.1.0" tikv-jemalloc-ctl = "0.4.0" tokio = { version = "1.0", features = ["macros", "time"] } tokio-util = { version = "0.6.3" } diff --git a/server/src/database.rs b/server/src/database.rs index 8494e2caa3..fcb6df16be 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -7,6 +7,7 @@ use data_types::server_id::ServerId; use data_types::{database_rules::DatabaseRules, DatabaseName}; use futures::future::{BoxFuture, Shared}; use futures::{FutureExt, TryFutureExt}; +use generated_types::database_rules::encode_database_rules; use internal_types::freezable::Freezable; use object_store::path::{ObjectStorePath, Path}; use observability_deps::tracing::{error, info}; @@ -17,7 +18,7 @@ use tokio::sync::Notify; use tokio::task::JoinError; use tokio_util::sync::CancellationToken; -use crate::db::load::load_or_create_preserved_catalog; +use crate::db::load::{create_preserved_catalog, load_or_create_preserved_catalog}; use crate::db::DatabaseToCommit; use crate::{ApplicationState, Db, DB_RULES_FILE_NAME}; use bytes::BytesMut; @@ -88,6 +89,9 @@ pub struct DatabaseConfig { } impl Database { + /// Create in-mem database object. + /// + /// This is backed by an existing database, which was [created](Self::create) some time in the past. pub fn new(application: Arc, config: DatabaseConfig) -> Self { info!(db_name=%config.name, store_prefix=%config.store_prefix.display(), "new database"); @@ -105,6 +109,30 @@ impl Database { Self { join, shared } } + /// Create fresh database w/o any state. + pub async fn create( + application: Arc, + store_prefix: &Path, + rules: DatabaseRules, + server_id: ServerId, + ) -> Result<(), InitError> { + let db_name = rules.name.clone(); + + persist_database_rules(application.object_store(), store_prefix, rules).await?; + + create_preserved_catalog( + db_name.as_str(), + Arc::clone(application.object_store()), + server_id, + Arc::clone(application.metric_registry()), + true, + ) + .await + .context(CannotCreatePreservedCatalog)?; + + Ok(()) + } + /// Triggers shutdown of this `Database` pub fn shutdown(&self) { info!(db_name=%self.shared.config.name, "database shutting down"); @@ -406,6 +434,17 @@ pub enum InitError { #[snafu(display("error during replay: {}", source))] Replay { source: crate::db::Error }, + + #[snafu(display("store error: {}", source))] + StoreError { source: object_store::Error }, + + #[snafu(display("error serializing database rules to protobuf: {}", source))] + ErrorSerializingRulesProtobuf { + source: generated_types::database_rules::EncodeError, + }, + + #[snafu(display("cannot create preserved catalog: {}", source))] + CannotCreatePreservedCatalog { source: crate::db::load::Error }, } /// The Database startup state machine @@ -610,6 +649,32 @@ async fn get_store_bytes( Ok(bytes.freeze()) } +/// Persist the the `DatabaseRules` given the `Database` store prefix +pub(super) async fn persist_database_rules( + object_store: &ObjectStore, + store_prefix: &Path, + rules: DatabaseRules, +) -> Result<(), InitError> { + let mut data = BytesMut::new(); + encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?; + + let mut location = store_prefix.clone(); + location.set_file_name(DB_RULES_FILE_NAME); + + let len = data.len(); + + let stream_data = std::io::Result::Ok(data.freeze()); + object_store + .put( + &location, + futures::stream::once(async move { stream_data }), + Some(len), + ) + .await + .context(StoreError)?; + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/server/src/lib.rs b/server/src/lib.rs index 58732aa8c3..002d7f5e0c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -71,7 +71,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::BytesMut; use data_types::database_rules::ShardConfig; use data_types::error::ErrorLogger; use data_types::{ @@ -80,11 +79,9 @@ use data_types::{ server_id::ServerId, {DatabaseName, DatabaseNameError}, }; -use database::{Database, DatabaseConfig}; -use db::load::create_preserved_catalog; +use database::{persist_database_rules, Database, DatabaseConfig}; use entry::{lines_to_sharded_entries, pb_to_entry, Entry, ShardedEntry}; use futures::future::{BoxFuture, Future, FutureExt, Shared, TryFutureExt}; -use generated_types::database_rules::encode_database_rules; use generated_types::influxdata::pbdata::v1 as pb; use hashbrown::HashMap; use influxdb_line_protocol::ParsedLine; @@ -140,6 +137,12 @@ pub enum Error { #[snafu(display("database not initialized"))] DatabaseNotInitialized { db_name: String }, + #[snafu(display("cannot persisted updated rules: {}", source))] + CannotPersistUpdatedRules { source: crate::database::InitError }, + + #[snafu(display("cannot create database: {}", source))] + CannotCreateDatabase { source: crate::database::InitError }, + #[snafu(display("database not found"))] DatabaseNotFound { db_name: String }, @@ -176,14 +179,6 @@ pub enum Error { #[snafu(display("error replicating to remote: {}", source))] ErrorReplicating { source: DatabaseError }, - #[snafu(display("error serializing database rules to protobuf: {}", source))] - ErrorSerializingRulesProtobuf { - source: generated_types::database_rules::EncodeError, - }, - - #[snafu(display("store error: {}", source))] - StoreError { source: object_store::Error }, - #[snafu(display("error converting line protocol to flatbuffers: {}", source))] LineConversion { source: entry::Error }, @@ -218,9 +213,6 @@ pub enum Error { source: connection::ConnectionManagerError, }, - #[snafu(display("cannot create preserved catalog: {}", source))] - CannotCreatePreservedCatalog { source: DatabaseError }, - #[snafu(display("database failed to initialize: {}", source))] DatabaseInit { source: Arc }, } @@ -687,18 +679,14 @@ where }; let store_prefix = database_store_prefix(object_store, server_id, &db_name); - persist_database_rules(object_store, &store_prefix, rules).await?; - - create_preserved_catalog( - db_name.as_str(), - Arc::clone(self.shared.application.object_store()), + Database::create( + Arc::clone(&self.shared.application), + &store_prefix, + rules, server_id, - Arc::clone(self.shared.application.metric_registry()), - true, ) .await - .map_err(|e| Box::new(e) as _) - .context(CannotCreatePreservedCatalog)?; + .map_err(|e| Error::CannotCreateDatabase { source: e })?; let database = { let mut state = self.shared.state.write(); @@ -971,7 +959,8 @@ where &database.config().store_prefix, rules.as_ref().clone(), ) - .await?; + .await + .map_err(|e| Error::CannotPersistUpdatedRules { source: e })?; Ok(rules) } @@ -1270,32 +1259,6 @@ fn database_store_prefix( path } -/// Persist the the `DatabaseRules` given the `Database` store prefix -async fn persist_database_rules( - object_store: &ObjectStore, - store_prefix: &Path, - rules: DatabaseRules, -) -> Result<()> { - let mut data = BytesMut::new(); - encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?; - - let mut location = store_prefix.clone(); - location.set_file_name(DB_RULES_FILE_NAME); - - let len = data.len(); - - let stream_data = std::io::Result::Ok(data.freeze()); - object_store - .put( - &location, - futures::stream::once(async move { stream_data }), - Some(len), - ) - .await - .context(StoreError)?; - Ok(()) -} - #[cfg(test)] mod tests { use std::{ @@ -2268,7 +2231,7 @@ mod tests { // creating database will now result in an error let err = create_simple_database(&server, db_name).await.unwrap_err(); - assert!(matches!(err, Error::CannotCreatePreservedCatalog { .. })); + assert!(matches!(err, Error::CannotCreateDatabase { .. })); } // run a sql query against the database, returning the results as record batches