diff --git a/iox_object_store/src/lib.rs b/iox_object_store/src/lib.rs index 2f7bb04036..f6357e984d 100644 --- a/iox_object_store/src/lib.rs +++ b/iox_object_store/src/lib.rs @@ -14,9 +14,12 @@ // TODO: Create an IoxPath type and only take/return paths of those types, and wrap in the // database's root path before sending to the underlying object_store. -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use data_types::{server_id::ServerId, DatabaseName}; -use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; +use futures::{ + stream::{self, BoxStream}, + Stream, StreamExt, TryStreamExt, +}; use object_store::{ path::{parsed::DirsAndFileName, ObjectStorePath, Path}, ObjectStore, ObjectStoreApi, Result, @@ -29,6 +32,8 @@ mod paths; use paths::{DataPath, RootPath}; pub use paths::{ParquetFilePath, ParquetFilePathParseError}; +const DB_RULES_FILE_NAME: &str = "rules.pb"; + /// Handles persistence of data for a particular database. Writes within its directory/prefix. /// /// This wrapper on top of an `ObjectStore` maps IOx specific concepts to ObjectStore locations @@ -141,6 +146,37 @@ impl IoxObjectStore { self.inner.delete(&full_path).await } + fn db_rules_path(&self) -> Path { + self.root_path.join(DB_RULES_FILE_NAME) + } + + /// Get the data for the database rules + pub async fn get_database_rules_file(&self) -> Result { + let mut stream = self.inner.get(&self.db_rules_path()).await?; + let mut bytes = BytesMut::new(); + + while let Some(buf) = stream.next().await { + bytes.extend(buf?); + } + + Ok(bytes.freeze()) + } + + /// Store the data for the database rules + pub async fn put_database_rules_file(&self, bytes: bytes::Bytes) -> Result<()> { + let len = bytes.len(); + let stream = stream::once(async move { Ok(bytes) }); + + self.inner + .put(&self.db_rules_path(), stream, Some(len)) + .await + } + + /// Delete the data for the database rules + pub async fn delete_database_rules_file(&self) -> Result<()> { + self.inner.delete(&self.db_rules_path()).await + } + /// List the relative paths in this database's object store. pub async fn list( &self, diff --git a/server/src/database.rs b/server/src/database.rs index 424dabacf4..6b368bb16e 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -3,7 +3,7 @@ use crate::{ load::{create_preserved_catalog, load_or_create_preserved_catalog}, DatabaseToCommit, }, - ApplicationState, Db, DB_RULES_FILE_NAME, + ApplicationState, Db, }; use bytes::BytesMut; use data_types::{ @@ -17,10 +17,7 @@ use futures::{ use generated_types::database_rules::encode_database_rules; use internal_types::freezable::Freezable; use iox_object_store::IoxObjectStore; -use object_store::{ - path::{ObjectStorePath, Path}, - ObjectStore, ObjectStoreApi, -}; +use object_store::path::Path; use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; use parquet_file::catalog::PreservedCatalog; @@ -129,21 +126,18 @@ impl Database { /// Create fresh database w/o any state. pub async fn create( application: Arc, - store_prefix: &Path, rules: DatabaseRules, server_id: ServerId, ) -> Result<(), InitError> { let db_name = rules.name.clone(); + let object_store = + IoxObjectStore::new(Arc::clone(application.object_store()), server_id, &db_name); - persist_database_rules(application.object_store(), store_prefix, rules).await?; + persist_database_rules(&object_store, rules).await?; create_preserved_catalog( db_name.as_str(), - Arc::new(IoxObjectStore::new( - Arc::clone(application.object_store()), - server_id, - &db_name, - )), + Arc::new(object_store), server_id, Arc::clone(application.metric_registry()), true, @@ -608,11 +602,10 @@ impl DatabaseStateKnown { &self, shared: &DatabaseShared, ) -> Result { - let mut location = shared.config.store_prefix.clone(); - location.set_file_name(DB_RULES_FILE_NAME); - // TODO: Retry this - let bytes = get_store_bytes(shared.application.object_store().as_ref(), &location) + let bytes = shared + .iox_object_store + .get_database_rules_file() .await .context(RulesFetch)?; @@ -715,47 +708,16 @@ struct DatabaseStateInitialized { db: Arc, } -/// Get the bytes for a given object store location -/// -/// TODO: move to object_store crate -async fn get_store_bytes( - store: &ObjectStore, - location: &Path, -) -> Result { - use futures::stream::TryStreamExt; - - let stream = store.get(location).await?; - let bytes = stream - .try_fold(BytesMut::new(), |mut acc, buf| async move { - acc.extend_from_slice(&buf); - Ok(acc) - }) - .await?; - - Ok(bytes.freeze()) -} - /// Persist the the `DatabaseRules` given the `Database` store prefix pub(super) async fn persist_database_rules( - object_store: &ObjectStore, - store_prefix: &Path, + object_store: &IoxObjectStore, rules: DatabaseRules, ) -> Result<(), InitError> { let mut data = BytesMut::new(); encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?; - let mut location = store_prefix.clone(); - location.set_file_name(DB_RULES_FILE_NAME); - - let len = data.len(); - - let stream_data = std::io::Result::Ok(data.freeze()); object_store - .put( - &location, - futures::stream::once(async move { stream_data }), - Some(len), - ) + .put_database_rules_file(data.freeze()) .await .context(StoreError)?; Ok(()) @@ -766,6 +728,7 @@ mod tests { use chrono::Utc; use data_types::database_rules::{PartitionTemplate, TemplatePart, WriteBufferConnection}; use entry::{test_helpers::lp_to_entries, Sequence, SequencedEntry}; + use object_store::{ObjectStore, ObjectStoreApi}; use write_buffer::{config::WriteBufferConfigFactory, mock::MockBufferSharedState}; use super::*; @@ -877,7 +840,7 @@ mod tests { )), }; let store_prefix = application.object_store().new_path(); - Database::create(Arc::clone(&application), &store_prefix, rules, server_id) + Database::create(Arc::clone(&application), rules, server_id) .await .unwrap(); let db_config = DatabaseConfig { diff --git a/server/src/lib.rs b/server/src/lib.rs index 715fa04919..de8ec6b14a 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -689,14 +689,9 @@ where }; let store_prefix = database_store_prefix(object_store, server_id, &db_name); - Database::create( - Arc::clone(&self.shared.application), - &store_prefix, - rules, - server_id, - ) - .await - .map_err(|e| Error::CannotCreateDatabase { source: e })?; + Database::create(Arc::clone(&self.shared.application), rules, server_id) + .await + .context(CannotCreateDatabase)?; let database = { let mut state = self.shared.state.write(); @@ -964,13 +959,9 @@ where let rules = db.update_rules(update).map_err(UpdateError::Closure)?; // TODO: Handle failure - persist_database_rules( - self.shared.application.object_store().as_ref(), - &database.config().store_prefix, - rules.as_ref().clone(), - ) - .await - .map_err(|e| Error::CannotPersistUpdatedRules { source: e })?; + persist_database_rules(&database.iox_object_store(), rules.as_ref().clone()) + .await + .context(CannotPersistUpdatedRules)?; Ok(rules) } @@ -1226,8 +1217,6 @@ where } } -pub(crate) const DB_RULES_FILE_NAME: &str = "rules.pb"; - /// Returns a list of database names and their prefix in object storage async fn list_databases( object_store: &ObjectStore, @@ -1284,12 +1273,11 @@ mod tests { }, }; use entry::test_helpers::lp_to_entry; - use futures::TryStreamExt; use generated_types::database_rules::decode_database_rules; use influxdb_line_protocol::parse_lines; use iox_object_store::IoxObjectStore; use metrics::TestMetricRegistry; - use object_store::{path::ObjectStorePath, ObjectStore}; + use object_store::ObjectStore; use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog}; use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase}; use std::{ @@ -1360,26 +1348,16 @@ mod tests { }; // Create a database - server + let bananas = server .create_database(rules.clone()) .await .expect("failed to create database"); - let mut rules_path = application.object_store().new_path(); - rules_path.push_all_dirs(&["1", name.as_str()]); - rules_path.set_file_name("rules.pb"); - - let read_data = application - .object_store() - .get(&rules_path) + let read_data = bananas + .iox_object_store() + .get_database_rules_file() .await - .unwrap() - .map_ok(|b| bytes::BytesMut::from(&b[..])) - .try_concat() - .await - .unwrap() - .freeze(); - + .unwrap(); let read_rules = decode_database_rules(read_data).unwrap(); assert_eq!(rules, read_rules); @@ -1464,19 +1442,14 @@ mod tests { #[tokio::test] async fn load_databases() { let application = make_application(); - let store = application.object_store(); let server = make_server(Arc::clone(&application)); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.wait_for_init().await.unwrap(); - create_simple_database(&server, "bananas") + let bananas = create_simple_database(&server, "bananas") .await .expect("failed to create database"); - let mut rules_path = store.new_path(); - rules_path.push_all_dirs(&["1", "bananas"]); - rules_path.set_file_name("rules.pb"); - std::mem::drop(server); let server = make_server(Arc::clone(&application)); @@ -1491,8 +1464,9 @@ mod tests { std::mem::drop(server); - store - .delete(&rules_path) + bananas + .iox_object_store() + .delete_database_rules_file() .await .expect("cannot delete rules file"); @@ -1941,21 +1915,12 @@ mod tests { .expect("failed to create database"); // tamper store - let mut path = database_store_prefix(store.as_ref(), server_id, &bar_db_name); - path.set_file_name(DB_RULES_FILE_NAME); - - let data = Bytes::from("x"); - let len = data.len(); - store - .put( - &path, - futures::stream::once(async move { Ok(data) }), - Some(len), - ) + let iox_object_store = IoxObjectStore::new(store, server_id, &bar_db_name); + iox_object_store + .put_database_rules_file(Bytes::from("x")) .await .unwrap(); - - store.get(&path).await.unwrap(); + iox_object_store.get_database_rules_file().await.unwrap(); // start server let server = make_server(application); @@ -2021,7 +1986,6 @@ mod tests { // setup let application = make_application(); - let store = Arc::clone(application.object_store()); let server_id = ServerId::try_from(1).unwrap(); // Create temporary server to create existing databases @@ -2033,7 +1997,7 @@ mod tests { .await .expect("failed to create database"); - create_simple_database(&server, db_name_rules_broken.clone()) + let rules_broken = create_simple_database(&server, db_name_rules_broken.clone()) .await .expect("failed to create database"); @@ -2042,18 +2006,9 @@ mod tests { .expect("failed to create database"); // tamper store to break one database - let mut path = database_store_prefix(store.as_ref(), server_id, &db_name_rules_broken); - path.set_file_name(DB_RULES_FILE_NAME); - - let data = Bytes::from("x"); - let len = data.len(); - - store - .put( - &path, - futures::stream::once(async move { Ok(data) }), - Some(len), - ) + rules_broken + .iox_object_store() + .put_database_rules_file(Bytes::from("x")) .await .unwrap(); @@ -2067,7 +2022,11 @@ mod tests { .await; drop(preserved_catalog); - store.get(&path).await.unwrap(); + rules_broken + .iox_object_store() + .get_database_rules_file() + .await + .unwrap(); // boot actual test server let server = make_server(Arc::clone(&application));