Merge branch 'main' into crepererum/improve_write_buffer_mocking

pull/24376/head
kodiakhq[bot] 2021-08-12 10:00:19 +00:00 committed by GitHub
commit 7956729ffa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 55 deletions

1
Cargo.lock generated
View File

@ -3874,7 +3874,6 @@ dependencies = [
"serde_json",
"snafu",
"snap",
"tempfile",
"test_helpers",
"tikv-jemalloc-ctl",
"tokio",

View File

@ -45,7 +45,6 @@ serde = "1.0"
serde_json = "1.0"
snafu = "0.6"
snap = "1.0.0"
tempfile = "3.1.0"
tikv-jemalloc-ctl = "0.4.0"
tokio = { version = "1.0", features = ["macros", "time"] }
tokio-util = { version = "0.6.3" }

View File

@ -7,6 +7,7 @@ use data_types::server_id::ServerId;
use data_types::{database_rules::DatabaseRules, DatabaseName};
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, TryFutureExt};
use generated_types::database_rules::encode_database_rules;
use internal_types::freezable::Freezable;
use object_store::path::{ObjectStorePath, Path};
use observability_deps::tracing::{error, info};
@ -17,7 +18,7 @@ use tokio::sync::Notify;
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use crate::db::load::load_or_create_preserved_catalog;
use crate::db::load::{create_preserved_catalog, load_or_create_preserved_catalog};
use crate::db::DatabaseToCommit;
use crate::{ApplicationState, Db, DB_RULES_FILE_NAME};
use bytes::BytesMut;
@ -88,6 +89,9 @@ pub struct DatabaseConfig {
}
impl Database {
/// Create in-mem database object.
///
/// This is backed by an existing database, which was [created](Self::create) some time in the past.
pub fn new(application: Arc<ApplicationState>, config: DatabaseConfig) -> Self {
info!(db_name=%config.name, store_prefix=%config.store_prefix.display(), "new database");
@ -105,6 +109,30 @@ impl Database {
Self { join, shared }
}
/// Create fresh database w/o any state.
pub async fn create(
application: Arc<ApplicationState>,
store_prefix: &Path,
rules: DatabaseRules,
server_id: ServerId,
) -> Result<(), InitError> {
let db_name = rules.name.clone();
persist_database_rules(application.object_store(), store_prefix, rules).await?;
create_preserved_catalog(
db_name.as_str(),
Arc::clone(application.object_store()),
server_id,
Arc::clone(application.metric_registry()),
true,
)
.await
.context(CannotCreatePreservedCatalog)?;
Ok(())
}
/// Triggers shutdown of this `Database`
pub fn shutdown(&self) {
info!(db_name=%self.shared.config.name, "database shutting down");
@ -406,6 +434,17 @@ pub enum InitError {
#[snafu(display("error during replay: {}", source))]
Replay { source: crate::db::Error },
#[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error },
#[snafu(display("error serializing database rules to protobuf: {}", source))]
ErrorSerializingRulesProtobuf {
source: generated_types::database_rules::EncodeError,
},
#[snafu(display("cannot create preserved catalog: {}", source))]
CannotCreatePreservedCatalog { source: crate::db::load::Error },
}
/// The Database startup state machine
@ -610,6 +649,32 @@ async fn get_store_bytes(
Ok(bytes.freeze())
}
/// Persist the the `DatabaseRules` given the `Database` store prefix
pub(super) async fn persist_database_rules(
object_store: &ObjectStore,
store_prefix: &Path,
rules: DatabaseRules,
) -> Result<(), InitError> {
let mut data = BytesMut::new();
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
let mut location = store_prefix.clone();
location.set_file_name(DB_RULES_FILE_NAME);
let len = data.len();
let stream_data = std::io::Result::Ok(data.freeze());
object_store
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(len),
)
.await
.context(StoreError)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -71,7 +71,6 @@
use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use data_types::database_rules::ShardConfig;
use data_types::error::ErrorLogger;
use data_types::{
@ -80,11 +79,9 @@ use data_types::{
server_id::ServerId,
{DatabaseName, DatabaseNameError},
};
use database::{Database, DatabaseConfig};
use db::load::create_preserved_catalog;
use database::{persist_database_rules, Database, DatabaseConfig};
use entry::{lines_to_sharded_entries, pb_to_entry, Entry, ShardedEntry};
use futures::future::{BoxFuture, Future, FutureExt, Shared, TryFutureExt};
use generated_types::database_rules::encode_database_rules;
use generated_types::influxdata::pbdata::v1 as pb;
use hashbrown::HashMap;
use influxdb_line_protocol::ParsedLine;
@ -140,6 +137,12 @@ pub enum Error {
#[snafu(display("database not initialized"))]
DatabaseNotInitialized { db_name: String },
#[snafu(display("cannot persisted updated rules: {}", source))]
CannotPersistUpdatedRules { source: crate::database::InitError },
#[snafu(display("cannot create database: {}", source))]
CannotCreateDatabase { source: crate::database::InitError },
#[snafu(display("database not found"))]
DatabaseNotFound { db_name: String },
@ -176,14 +179,6 @@ pub enum Error {
#[snafu(display("error replicating to remote: {}", source))]
ErrorReplicating { source: DatabaseError },
#[snafu(display("error serializing database rules to protobuf: {}", source))]
ErrorSerializingRulesProtobuf {
source: generated_types::database_rules::EncodeError,
},
#[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error },
#[snafu(display("error converting line protocol to flatbuffers: {}", source))]
LineConversion { source: entry::Error },
@ -218,9 +213,6 @@ pub enum Error {
source: connection::ConnectionManagerError,
},
#[snafu(display("cannot create preserved catalog: {}", source))]
CannotCreatePreservedCatalog { source: DatabaseError },
#[snafu(display("database failed to initialize: {}", source))]
DatabaseInit { source: Arc<database::InitError> },
}
@ -687,18 +679,14 @@ where
};
let store_prefix = database_store_prefix(object_store, server_id, &db_name);
persist_database_rules(object_store, &store_prefix, rules).await?;
create_preserved_catalog(
db_name.as_str(),
Arc::clone(self.shared.application.object_store()),
Database::create(
Arc::clone(&self.shared.application),
&store_prefix,
rules,
server_id,
Arc::clone(self.shared.application.metric_registry()),
true,
)
.await
.map_err(|e| Box::new(e) as _)
.context(CannotCreatePreservedCatalog)?;
.map_err(|e| Error::CannotCreateDatabase { source: e })?;
let database = {
let mut state = self.shared.state.write();
@ -971,7 +959,8 @@ where
&database.config().store_prefix,
rules.as_ref().clone(),
)
.await?;
.await
.map_err(|e| Error::CannotPersistUpdatedRules { source: e })?;
Ok(rules)
}
@ -1270,32 +1259,6 @@ fn database_store_prefix(
path
}
/// Persist the the `DatabaseRules` given the `Database` store prefix
async fn persist_database_rules(
object_store: &ObjectStore,
store_prefix: &Path,
rules: DatabaseRules,
) -> Result<()> {
let mut data = BytesMut::new();
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
let mut location = store_prefix.clone();
location.set_file_name(DB_RULES_FILE_NAME);
let len = data.len();
let stream_data = std::io::Result::Ok(data.freeze());
object_store
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(len),
)
.await
.context(StoreError)?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::{
@ -2268,7 +2231,7 @@ mod tests {
// creating database will now result in an error
let err = create_simple_database(&server, db_name).await.unwrap_err();
assert!(matches!(err, Error::CannotCreatePreservedCatalog { .. }));
assert!(matches!(err, Error::CannotCreateDatabase { .. }));
}
// run a sql query against the database, returning the results as record batches