refactor: Move the database rules functionality to iox_object_store

pull/24376/head
Jake Goulding 2021-08-13 14:33:55 -04:00 committed by Carol (Nichols || Goulding)
parent 4447f1e22c
commit 63111d9d9a
3 changed files with 80 additions and 122 deletions

View File

@ -14,9 +14,12 @@
// TODO: Create an IoxPath type and only take/return paths of those types, and wrap in the
// database's root path before sending to the underlying object_store.
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use data_types::{server_id::ServerId, DatabaseName};
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use futures::{
stream::{self, BoxStream},
Stream, StreamExt, TryStreamExt,
};
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi, Result,
@ -29,6 +32,8 @@ mod paths;
use paths::{DataPath, RootPath};
pub use paths::{ParquetFilePath, ParquetFilePathParseError};
const DB_RULES_FILE_NAME: &str = "rules.pb";
/// Handles persistence of data for a particular database. Writes within its directory/prefix.
///
/// This wrapper on top of an `ObjectStore` maps IOx specific concepts to ObjectStore locations
@ -141,6 +146,37 @@ impl IoxObjectStore {
self.inner.delete(&full_path).await
}
fn db_rules_path(&self) -> Path {
self.root_path.join(DB_RULES_FILE_NAME)
}
/// Get the data for the database rules
pub async fn get_database_rules_file(&self) -> Result<bytes::Bytes> {
let mut stream = self.inner.get(&self.db_rules_path()).await?;
let mut bytes = BytesMut::new();
while let Some(buf) = stream.next().await {
bytes.extend(buf?);
}
Ok(bytes.freeze())
}
/// Store the data for the database rules
pub async fn put_database_rules_file(&self, bytes: bytes::Bytes) -> Result<()> {
let len = bytes.len();
let stream = stream::once(async move { Ok(bytes) });
self.inner
.put(&self.db_rules_path(), stream, Some(len))
.await
}
/// Delete the data for the database rules
pub async fn delete_database_rules_file(&self) -> Result<()> {
self.inner.delete(&self.db_rules_path()).await
}
/// List the relative paths in this database's object store.
pub async fn list(
&self,

View File

@ -3,7 +3,7 @@ use crate::{
load::{create_preserved_catalog, load_or_create_preserved_catalog},
DatabaseToCommit,
},
ApplicationState, Db, DB_RULES_FILE_NAME,
ApplicationState, Db,
};
use bytes::BytesMut;
use data_types::{
@ -17,10 +17,7 @@ use futures::{
use generated_types::database_rules::encode_database_rules;
use internal_types::freezable::Freezable;
use iox_object_store::IoxObjectStore;
use object_store::{
path::{ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use object_store::path::Path;
use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock;
use parquet_file::catalog::PreservedCatalog;
@ -129,21 +126,18 @@ impl Database {
/// Create fresh database w/o any state.
pub async fn create(
application: Arc<ApplicationState>,
store_prefix: &Path,
rules: DatabaseRules,
server_id: ServerId,
) -> Result<(), InitError> {
let db_name = rules.name.clone();
let object_store =
IoxObjectStore::new(Arc::clone(application.object_store()), server_id, &db_name);
persist_database_rules(application.object_store(), store_prefix, rules).await?;
persist_database_rules(&object_store, rules).await?;
create_preserved_catalog(
db_name.as_str(),
Arc::new(IoxObjectStore::new(
Arc::clone(application.object_store()),
server_id,
&db_name,
)),
Arc::new(object_store),
server_id,
Arc::clone(application.metric_registry()),
true,
@ -608,11 +602,10 @@ impl DatabaseStateKnown {
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateRulesLoaded, InitError> {
let mut location = shared.config.store_prefix.clone();
location.set_file_name(DB_RULES_FILE_NAME);
// TODO: Retry this
let bytes = get_store_bytes(shared.application.object_store().as_ref(), &location)
let bytes = shared
.iox_object_store
.get_database_rules_file()
.await
.context(RulesFetch)?;
@ -715,47 +708,16 @@ struct DatabaseStateInitialized {
db: Arc<Db>,
}
/// Get the bytes for a given object store location
///
/// TODO: move to object_store crate
async fn get_store_bytes(
store: &ObjectStore,
location: &Path,
) -> Result<bytes::Bytes, object_store::Error> {
use futures::stream::TryStreamExt;
let stream = store.get(location).await?;
let bytes = stream
.try_fold(BytesMut::new(), |mut acc, buf| async move {
acc.extend_from_slice(&buf);
Ok(acc)
})
.await?;
Ok(bytes.freeze())
}
/// Persist the the `DatabaseRules` given the `Database` store prefix
pub(super) async fn persist_database_rules(
object_store: &ObjectStore,
store_prefix: &Path,
object_store: &IoxObjectStore,
rules: DatabaseRules,
) -> Result<(), InitError> {
let mut data = BytesMut::new();
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
let mut location = store_prefix.clone();
location.set_file_name(DB_RULES_FILE_NAME);
let len = data.len();
let stream_data = std::io::Result::Ok(data.freeze());
object_store
.put(
&location,
futures::stream::once(async move { stream_data }),
Some(len),
)
.put_database_rules_file(data.freeze())
.await
.context(StoreError)?;
Ok(())
@ -766,6 +728,7 @@ mod tests {
use chrono::Utc;
use data_types::database_rules::{PartitionTemplate, TemplatePart, WriteBufferConnection};
use entry::{test_helpers::lp_to_entries, Sequence, SequencedEntry};
use object_store::{ObjectStore, ObjectStoreApi};
use write_buffer::{config::WriteBufferConfigFactory, mock::MockBufferSharedState};
use super::*;
@ -877,7 +840,7 @@ mod tests {
)),
};
let store_prefix = application.object_store().new_path();
Database::create(Arc::clone(&application), &store_prefix, rules, server_id)
Database::create(Arc::clone(&application), rules, server_id)
.await
.unwrap();
let db_config = DatabaseConfig {

View File

@ -689,14 +689,9 @@ where
};
let store_prefix = database_store_prefix(object_store, server_id, &db_name);
Database::create(
Arc::clone(&self.shared.application),
&store_prefix,
rules,
server_id,
)
.await
.map_err(|e| Error::CannotCreateDatabase { source: e })?;
Database::create(Arc::clone(&self.shared.application), rules, server_id)
.await
.context(CannotCreateDatabase)?;
let database = {
let mut state = self.shared.state.write();
@ -964,13 +959,9 @@ where
let rules = db.update_rules(update).map_err(UpdateError::Closure)?;
// TODO: Handle failure
persist_database_rules(
self.shared.application.object_store().as_ref(),
&database.config().store_prefix,
rules.as_ref().clone(),
)
.await
.map_err(|e| Error::CannotPersistUpdatedRules { source: e })?;
persist_database_rules(&database.iox_object_store(), rules.as_ref().clone())
.await
.context(CannotPersistUpdatedRules)?;
Ok(rules)
}
@ -1226,8 +1217,6 @@ where
}
}
pub(crate) const DB_RULES_FILE_NAME: &str = "rules.pb";
/// Returns a list of database names and their prefix in object storage
async fn list_databases(
object_store: &ObjectStore,
@ -1284,12 +1273,11 @@ mod tests {
},
};
use entry::test_helpers::lp_to_entry;
use futures::TryStreamExt;
use generated_types::database_rules::decode_database_rules;
use influxdb_line_protocol::parse_lines;
use iox_object_store::IoxObjectStore;
use metrics::TestMetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore};
use object_store::ObjectStore;
use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog};
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase};
use std::{
@ -1360,26 +1348,16 @@ mod tests {
};
// Create a database
server
let bananas = server
.create_database(rules.clone())
.await
.expect("failed to create database");
let mut rules_path = application.object_store().new_path();
rules_path.push_all_dirs(&["1", name.as_str()]);
rules_path.set_file_name("rules.pb");
let read_data = application
.object_store()
.get(&rules_path)
let read_data = bananas
.iox_object_store()
.get_database_rules_file()
.await
.unwrap()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.unwrap()
.freeze();
.unwrap();
let read_rules = decode_database_rules(read_data).unwrap();
assert_eq!(rules, read_rules);
@ -1464,19 +1442,14 @@ mod tests {
#[tokio::test]
async fn load_databases() {
let application = make_application();
let store = application.object_store();
let server = make_server(Arc::clone(&application));
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.wait_for_init().await.unwrap();
create_simple_database(&server, "bananas")
let bananas = create_simple_database(&server, "bananas")
.await
.expect("failed to create database");
let mut rules_path = store.new_path();
rules_path.push_all_dirs(&["1", "bananas"]);
rules_path.set_file_name("rules.pb");
std::mem::drop(server);
let server = make_server(Arc::clone(&application));
@ -1491,8 +1464,9 @@ mod tests {
std::mem::drop(server);
store
.delete(&rules_path)
bananas
.iox_object_store()
.delete_database_rules_file()
.await
.expect("cannot delete rules file");
@ -1941,21 +1915,12 @@ mod tests {
.expect("failed to create database");
// tamper store
let mut path = database_store_prefix(store.as_ref(), server_id, &bar_db_name);
path.set_file_name(DB_RULES_FILE_NAME);
let data = Bytes::from("x");
let len = data.len();
store
.put(
&path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
let iox_object_store = IoxObjectStore::new(store, server_id, &bar_db_name);
iox_object_store
.put_database_rules_file(Bytes::from("x"))
.await
.unwrap();
store.get(&path).await.unwrap();
iox_object_store.get_database_rules_file().await.unwrap();
// start server
let server = make_server(application);
@ -2021,7 +1986,6 @@ mod tests {
// setup
let application = make_application();
let store = Arc::clone(application.object_store());
let server_id = ServerId::try_from(1).unwrap();
// Create temporary server to create existing databases
@ -2033,7 +1997,7 @@ mod tests {
.await
.expect("failed to create database");
create_simple_database(&server, db_name_rules_broken.clone())
let rules_broken = create_simple_database(&server, db_name_rules_broken.clone())
.await
.expect("failed to create database");
@ -2042,18 +2006,9 @@ mod tests {
.expect("failed to create database");
// tamper store to break one database
let mut path = database_store_prefix(store.as_ref(), server_id, &db_name_rules_broken);
path.set_file_name(DB_RULES_FILE_NAME);
let data = Bytes::from("x");
let len = data.len();
store
.put(
&path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
rules_broken
.iox_object_store()
.put_database_rules_file(Bytes::from("x"))
.await
.unwrap();
@ -2067,7 +2022,11 @@ mod tests {
.await;
drop(preserved_catalog);
store.get(&path).await.unwrap();
rules_broken
.iox_object_store()
.get_database_rules_file()
.await
.unwrap();
// boot actual test server
let server = make_server(Arc::clone(&application));