Merge branch 'main' into er/feat/read_buffer_metrics

pull/24376/head
Edd Robinson 2021-08-13 15:02:03 +01:00 committed by GitHub
commit 13aaa1f105
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1121 additions and 1546 deletions

14
Cargo.lock generated
View File

@ -1798,6 +1798,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "iox_object_store"
version = "0.1.0"
dependencies = [
"bytes",
"data_types",
"futures",
"object_store",
"tokio",
"tokio-stream",
]
[[package]]
name = "ipnet"
version = "2.3.1"
@ -2675,6 +2687,7 @@ dependencies = [
"futures",
"generated_types",
"internal_types",
"iox_object_store",
"metrics",
"object_store",
"observability_deps",
@ -3860,6 +3873,7 @@ dependencies = [
"influxdb_iox_client",
"influxdb_line_protocol",
"internal_types",
"iox_object_store",
"itertools 0.10.1",
"lifecycle",
"metrics",

View File

@ -25,6 +25,7 @@ members = [
"influxdb_line_protocol",
"influxdb_tsm",
"internal_types",
"iox_object_store",
"logfmt",
"lifecycle",
"mem_qe",

View File

@ -0,0 +1,13 @@
[package]
name = "iox_object_store"
version = "0.1.0"
edition = "2018"
description = "IOx-specific semantics wrapping the general-purpose object store crate"
[dependencies]
bytes = "1.0"
data_types = { path = "../data_types" }
futures = "0.3"
object_store = { path = "../object_store" }
tokio = { version = "1.0", features = ["macros", "time"] }
tokio-stream = "0.1"

200
iox_object_store/src/lib.rs Normal file
View File

@ -0,0 +1,200 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
//! Wraps the object_store crate with IOx-specific semantics.
// TODO: Create an IoxPath type and only take/return paths of those types, and wrap in the
// database's root path before sending to the underlying object_store.
use bytes::Bytes;
use data_types::{server_id::ServerId, DatabaseName};
use futures::{stream::BoxStream, Stream, StreamExt};
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi, Result,
};
use std::{io, sync::Arc};
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
/// Handles persistence of data for a particular database. Writes within its directory/prefix.
///
/// This wrapper on top of an `ObjectStore` maps IOx specific concepts to ObjectStore locations
#[derive(Debug)]
pub struct IoxObjectStore {
inner: Arc<ObjectStore>,
server_id: ServerId,
database_name: String, // TODO: use data_types DatabaseName?
root_path: RootPath,
}
impl IoxObjectStore {
/// Create a database-specific wrapper. Takes all the information needed to create the
/// root directory of a database.
pub fn new(
inner: Arc<ObjectStore>,
server_id: ServerId,
database_name: &DatabaseName<'_>,
) -> Self {
let root_path = RootPath::new(inner.new_path(), server_id, database_name);
Self {
inner,
server_id,
database_name: database_name.into(),
root_path,
}
}
/// The name of the database this object store is for.
pub fn database_name(&self) -> &str {
&self.database_name
}
/// Path where transactions are stored.
///
/// The format is:
///
/// ```text
/// <server_id>/<db_name>/transactions/
/// ```
// TODO: avoid leaking this outside this crate
pub fn catalog_path(&self) -> Path {
let mut path = self.inner.new_path();
path.push_dir(self.server_id.to_string());
path.push_dir(&self.database_name);
path.push_dir("transactions");
path
}
/// Location where parquet data goes to.
///
/// Schema currently is:
///
/// ```text
/// <server_id>/<db_name>/data/
/// ```
// TODO: avoid leaking this outside this crate
pub fn data_path(&self) -> Path {
let mut path = self.inner.new_path();
path.push_dir(self.server_id.to_string());
path.push_dir(&self.database_name);
path.push_dir("data");
path
}
/// Store this data in this database's object store.
pub async fn put<S>(&self, location: &Path, bytes: S, length: Option<usize>) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
self.inner.put(location, bytes, length).await
}
/// List all the catalog transaction files in object storage for this database.
pub async fn catalog_transaction_files(&self) -> Result<BoxStream<'static, Result<Vec<Path>>>> {
Ok(self.list(Some(&self.catalog_path())).await?.boxed())
}
/// List the relative paths in this database's object store.
pub async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'static, Result<Vec<Path>>>> {
let (tx, rx) = channel(4);
let inner = Arc::clone(&self.inner);
let prefix = prefix.cloned();
// This is necessary because of the lifetime restrictions on the ObjectStoreApi trait's
// methods, which might not actually be necessary but fixing it involves changes to the
// cloud_storage crate that are longer term.
tokio::spawn(async move {
match inner.list(prefix.as_ref()).await {
Err(e) => {
let _ = tx.send(Err(e)).await;
}
Ok(mut stream) => {
while let Some(list) = stream.next().await {
let _ = tx.send(list).await;
}
}
}
});
Ok(ReceiverStream::new(rx).boxed())
}
/// Get the data in this relative path in this database's object store.
pub async fn get(&self, location: &Path) -> Result<BoxStream<'static, Result<Bytes>>> {
self.inner.get(location).await
}
/// Delete the relative paths in this database's object store.
pub async fn delete(&self, location: &Path) -> Result<()> {
self.inner.delete(location).await
}
/// Create implementation-specific path from parsed representation.
/// This might not be needed eventually
pub fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Path {
self.inner.path_from_dirs_and_filename(path)
}
}
/// A database-specific object store path that all `IoxPath`s should be within.
#[derive(Debug, Clone)]
struct RootPath {
root: Path,
}
impl RootPath {
/// How the root of a database is defined in object storage.
fn new(mut root: Path, server_id: ServerId, database_name: &DatabaseName<'_>) -> Self {
root.push_dir(server_id.to_string());
root.push_dir(database_name.as_str());
Self { root }
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::num::NonZeroU32;
/// Creates new test server ID
fn make_server_id() -> ServerId {
ServerId::new(NonZeroU32::new(1).unwrap())
}
/// Creates a new in-memory object store. These tests rely on the `Path`s being of type
/// `DirsAndFileName` and thus using object_store::path::DELIMITER as the separator
fn make_object_store() -> Arc<ObjectStore> {
Arc::new(ObjectStore::new_in_memory())
}
#[test]
fn catalog_path_is_relative_to_db_root() {
let server_id = make_server_id();
let database_name = DatabaseName::new("clouds").unwrap();
let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name);
assert_eq!(
iox_object_store.catalog_path().display(),
"1/clouds/transactions/"
);
}
#[test]
fn data_path_is_relative_to_db_root() {
let server_id = make_server_id();
let database_name = DatabaseName::new("clouds").unwrap();
let iox_object_store = IoxObjectStore::new(make_object_store(), server_id, &database_name);
assert_eq!(iox_object_store.data_path().display(), "1/clouds/data/");
}
}

View File

@ -130,7 +130,7 @@ impl DirsAndFileName {
}
/// Add a `PathPart` to the end of the path's directories.
pub(crate) fn push_part_as_dir(&mut self, part: &PathPart) {
pub fn push_part_as_dir(&mut self, part: &PathPart) {
self.directories.push(part.to_owned());
}

View File

@ -14,9 +14,10 @@ datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3.7"
generated_types = { path = "../generated_types" }
internal_types = {path = "../internal_types"}
internal_types = { path = "../internal_types" }
iox_object_store = { path = "../iox_object_store" }
metrics = { path = "../metrics" }
object_store = {path = "../object_store"}
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
parquet = "5.0"
parquet-format = "2.6"

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,8 @@ use internal_types::{
schema::{Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use object_store::{path::Path, ObjectStore};
use iox_object_store::IoxObjectStore;
use object_store::path::Path;
use query::predicate::Predicate;
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, mem, sync::Arc};
@ -91,8 +92,8 @@ pub struct ParquetChunk {
/// (extracted from TableSummary)
timestamp_range: Option<TimestampRange>,
/// Object store of the above relative path to open and read the file
object_store: Arc<ObjectStore>,
/// Persists the parquet file within a database's relative path
iox_object_store: Arc<IoxObjectStore>,
/// Path in the object store. Format:
/// <writer id>/<database>/data/<partition key>/<chunk
@ -112,7 +113,7 @@ impl ParquetChunk {
/// Creates new chunk from given parquet metadata.
pub fn new(
file_location: Path,
store: Arc<ObjectStore>,
iox_object_store: Arc<IoxObjectStore>,
file_size_bytes: usize,
parquet_metadata: Arc<IoxParquetMetaData>,
table_name: Arc<str>,
@ -137,7 +138,7 @@ impl ParquetChunk {
Arc::new(table_summary),
schema,
file_location,
store,
iox_object_store,
file_size_bytes,
parquet_metadata,
metrics,
@ -152,7 +153,7 @@ impl ParquetChunk {
table_summary: Arc<TableSummary>,
schema: Arc<Schema>,
file_location: Path,
store: Arc<ObjectStore>,
iox_object_store: Arc<IoxObjectStore>,
file_size_bytes: usize,
parquet_metadata: Arc<IoxParquetMetaData>,
metrics: ChunkMetrics,
@ -164,7 +165,7 @@ impl ParquetChunk {
table_summary,
schema,
timestamp_range,
object_store: store,
iox_object_store,
object_store_path: file_location,
file_size_bytes,
parquet_metadata,
@ -247,7 +248,7 @@ impl ParquetChunk {
selection,
Arc::clone(&self.schema.as_arrow()),
self.object_store_path.clone(),
Arc::clone(&self.object_store),
Arc::clone(&self.iox_object_store),
)
.context(ReadParquet)
}

View File

@ -1,11 +1,9 @@
//! Methods to cleanup the object store.
use std::{collections::HashSet, sync::Arc};
use crate::{
catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog},
storage::data_location,
};
use crate::catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog};
use futures::TryStreamExt;
use iox_object_store::IoxObjectStore;
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
@ -51,29 +49,26 @@ pub async fn get_unreferenced_parquet_files(
catalog: &PreservedCatalog,
max_files: usize,
) -> Result<Vec<Path>> {
let store = catalog.object_store();
let server_id = catalog.server_id();
let db_name = catalog.db_name();
let iox_object_store = catalog.iox_object_store();
let all_known = {
// replay catalog transactions to track ALL (even dropped) files that are referenced
let (_catalog, state) = PreservedCatalog::load::<TracerCatalogState>(
Arc::clone(&store),
server_id,
db_name.to_string(),
(),
)
.await
.context(CatalogLoadError)?
.expect("catalog gone while reading it?");
let (_catalog, state) =
PreservedCatalog::load::<TracerCatalogState>(Arc::clone(&iox_object_store), ())
.await
.context(CatalogLoadError)?
.expect("catalog gone while reading it?");
state.files.into_inner()
};
let prefix = data_location(&store, server_id, db_name);
let prefix = iox_object_store.data_path();
// gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long
let mut to_remove = vec![];
let mut stream = store.list(Some(&prefix)).await.context(ReadError)?;
let mut stream = iox_object_store
.list(Some(&prefix))
.await
.context(ReadError)?;
'outer: while let Some(paths) = stream.try_next().await.context(ReadError)? {
for path in paths {
@ -111,7 +106,7 @@ pub async fn get_unreferenced_parquet_files(
/// File creation and catalog modifications can be done while calling this method. Even
/// [`get_unreferenced_parquet_files`] can be called while is method is in-progress.
pub async fn delete_files(catalog: &PreservedCatalog, files: &[Path]) -> Result<()> {
let store = catalog.object_store();
let store = catalog.iox_object_store();
for path in files {
info!(path = %path.display(), "Delete file");
@ -139,7 +134,7 @@ impl CatalogState for TracerCatalogState {
fn add(
&mut self,
_object_store: Arc<ObjectStore>,
_iox_object_store: Arc<IoxObjectStore>,
info: CatalogParquetInfo,
) -> crate::catalog::Result<()> {
self.files.lock().insert(info.path);
@ -154,33 +149,26 @@ impl CatalogState for TracerCatalogState {
#[cfg(test)]
mod tests {
use std::{collections::HashSet, num::NonZeroU32, sync::Arc};
use std::{collections::HashSet, sync::Arc};
use bytes::Bytes;
use data_types::server_id::ServerId;
use object_store::path::{ObjectStorePath, Path};
use tokio::sync::RwLock;
use super::*;
use crate::{
catalog::test_helpers::TestCatalogState,
test_utils::{chunk_addr, db_name, make_metadata, make_object_store},
test_utils::{chunk_addr, make_iox_object_store, make_metadata},
};
#[tokio::test]
async fn test_cleanup_empty() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let iox_object_store = make_iox_object_store();
let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let (catalog, _state) =
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
.await
.unwrap();
// run clean-up
let files = get_unreferenced_parquet_files(&catalog, 1_000)
@ -191,18 +179,12 @@ mod tests {
#[tokio::test]
async fn test_cleanup_rules() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = db_name();
let iox_object_store = make_iox_object_store();
let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let (catalog, _state) =
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
.await
.unwrap();
// create some data
let mut paths_keep = vec![];
@ -211,7 +193,7 @@ mod tests {
let mut transaction = catalog.open_transaction().await;
// an ordinary tracked parquet file => keep
let (path, metadata) = make_metadata(&object_store, "foo", chunk_addr(1)).await;
let (path, metadata) = make_metadata(&iox_object_store, "foo", chunk_addr(1)).await;
let metadata = Arc::new(metadata);
let path = path.into();
let info = CatalogParquetInfo {
@ -224,7 +206,7 @@ mod tests {
paths_keep.push(info.path.display());
// another ordinary tracked parquet file that was added and removed => keep (for time travel)
let (path, metadata) = make_metadata(&object_store, "foo", chunk_addr(2)).await;
let (path, metadata) = make_metadata(&iox_object_store, "foo", chunk_addr(2)).await;
let metadata = Arc::new(metadata);
let path = path.into();
let info = CatalogParquetInfo {
@ -239,12 +221,12 @@ mod tests {
// not a parquet file => keep
let mut path = info.path;
path.file_name = Some("foo.txt".into());
let path = object_store.path_from_dirs_and_filename(path);
create_empty_file(&object_store, &path).await;
let path = iox_object_store.path_from_dirs_and_filename(path);
create_empty_file(&iox_object_store, &path).await;
paths_keep.push(path.display());
// an untracked parquet file => delete
let (path, _md) = make_metadata(&object_store, "foo", chunk_addr(3)).await;
let (path, _md) = make_metadata(&iox_object_store, "foo", chunk_addr(3)).await;
paths_delete.push(path.display());
transaction.commit().await.unwrap();
@ -260,7 +242,7 @@ mod tests {
delete_files(&catalog, &files).await.unwrap();
// list all files
let all_files = list_all_files(&object_store).await;
let all_files = list_all_files(&iox_object_store).await;
for p in paths_keep {
assert!(dbg!(&all_files).contains(dbg!(&p)));
}
@ -271,32 +253,26 @@ mod tests {
#[tokio::test]
async fn test_cleanup_with_parallel_transaction() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = db_name();
let iox_object_store = make_iox_object_store();
let lock: RwLock<()> = Default::default();
let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let (catalog, _state) =
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
.await
.unwrap();
// try multiple times to provoke a conflict
for i in 0..100 {
// Every so often try to create a file with the same ChunkAddr beforehand. This should not trick the cleanup
// logic to remove the actual file because file paths contains a UUIDv4 part.
if i % 2 == 0 {
make_metadata(&object_store, "foo", chunk_addr(i)).await;
make_metadata(&iox_object_store, "foo", chunk_addr(i)).await;
}
let (path, _) = tokio::join!(
async {
let guard = lock.read().await;
let (path, md) = make_metadata(&object_store, "foo", chunk_addr(i)).await;
let (path, md) = make_metadata(&iox_object_store, "foo", chunk_addr(i)).await;
let metadata = Arc::new(md);
let path = path.into();
@ -325,30 +301,24 @@ mod tests {
},
);
let all_files = list_all_files(&object_store).await;
let all_files = list_all_files(&iox_object_store).await;
assert!(all_files.contains(&path));
}
}
#[tokio::test]
async fn test_cleanup_max_files() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = db_name();
let iox_object_store = make_iox_object_store();
let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let (catalog, _state) =
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
.await
.unwrap();
// create some files
let mut to_remove: HashSet<String> = Default::default();
for chunk_id in 0..3 {
let (path, _md) = make_metadata(&object_store, "foo", chunk_addr(chunk_id)).await;
let (path, _md) = make_metadata(&iox_object_store, "foo", chunk_addr(chunk_id)).await;
to_remove.insert(path.display());
}
@ -358,7 +328,7 @@ mod tests {
delete_files(&catalog, &files).await.unwrap();
// should only delete 2
let all_files = list_all_files(&object_store).await;
let all_files = list_all_files(&iox_object_store).await;
let leftover: HashSet<_> = all_files.intersection(&to_remove).collect();
assert_eq!(leftover.len(), 1);
@ -368,20 +338,16 @@ mod tests {
delete_files(&catalog, &files).await.unwrap();
// should delete remaining file
let all_files = list_all_files(&object_store).await;
let all_files = list_all_files(&iox_object_store).await;
let leftover: HashSet<_> = all_files.intersection(&to_remove).collect();
assert_eq!(leftover.len(), 0);
}
fn make_server_id() -> ServerId {
ServerId::new(NonZeroU32::new(1).unwrap())
}
async fn create_empty_file(object_store: &ObjectStore, path: &Path) {
async fn create_empty_file(iox_object_store: &IoxObjectStore, path: &Path) {
let data = Bytes::default();
let len = data.len();
object_store
iox_object_store
.put(
path,
futures::stream::once(async move { Ok(data) }),
@ -391,8 +357,8 @@ mod tests {
.unwrap();
}
async fn list_all_files(object_store: &ObjectStore) -> HashSet<String> {
object_store
async fn list_all_files(iox_object_store: &IoxObjectStore) -> HashSet<String> {
iox_object_store
.list(None)
.await
.unwrap()

View File

@ -811,15 +811,17 @@ mod tests {
use crate::test_utils::{
chunk_addr, create_partition_and_database_checkpoint, load_parquet_from_store, make_chunk,
make_chunk_no_row_group, make_object_store,
make_chunk_no_row_group, make_iox_object_store,
};
#[tokio::test]
async fn test_restore_from_file() {
// setup: preserve chunk to object store
let store = make_object_store();
let chunk = make_chunk(Arc::clone(&store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, store).await.unwrap();
let iox_object_store = make_iox_object_store();
let chunk = make_chunk(Arc::clone(&iox_object_store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
// step 1: read back schema
@ -841,9 +843,11 @@ mod tests {
#[tokio::test]
async fn test_restore_from_thrift() {
// setup: write chunk to object store and only keep thrift-encoded metadata
let store = make_object_store();
let chunk = make_chunk(Arc::clone(&store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, store).await.unwrap();
let iox_object_store = make_iox_object_store();
let chunk = make_chunk(Arc::clone(&iox_object_store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
let data = parquet_metadata.to_thrift().unwrap();
let parquet_metadata = IoxParquetMetaData::from_thrift(&data).unwrap();
@ -862,9 +866,12 @@ mod tests {
#[tokio::test]
async fn test_restore_from_file_no_row_group() {
// setup: preserve chunk to object store
let store = make_object_store();
let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, store).await.unwrap();
let iox_object_store = make_iox_object_store();
let chunk =
make_chunk_no_row_group(Arc::clone(&iox_object_store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
// step 1: read back schema
@ -883,9 +890,12 @@ mod tests {
#[tokio::test]
async fn test_restore_from_thrift_no_row_group() {
// setup: write chunk to object store and only keep thrift-encoded metadata
let store = make_object_store();
let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, store).await.unwrap();
let iox_object_store = make_iox_object_store();
let chunk =
make_chunk_no_row_group(Arc::clone(&iox_object_store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
let data = parquet_metadata.to_thrift().unwrap();
let parquet_metadata = IoxParquetMetaData::from_thrift(&data).unwrap();
@ -905,9 +915,11 @@ mod tests {
#[tokio::test]
async fn test_make_chunk() {
let store = make_object_store();
let chunk = make_chunk(Arc::clone(&store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, store).await.unwrap();
let iox_object_store = make_iox_object_store();
let chunk = make_chunk(Arc::clone(&iox_object_store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
assert!(parquet_metadata.md.num_row_groups() > 1);
@ -945,9 +957,12 @@ mod tests {
#[tokio::test]
async fn test_make_chunk_no_row_group() {
let store = make_object_store();
let chunk = make_chunk_no_row_group(Arc::clone(&store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, store).await.unwrap();
let iox_object_store = make_iox_object_store();
let chunk =
make_chunk_no_row_group(Arc::clone(&iox_object_store), "foo", chunk_addr(1)).await;
let parquet_data = load_parquet_from_store(&chunk, iox_object_store)
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data).unwrap();
assert_eq!(parquet_metadata.md.num_row_groups(), 0);

View File

@ -1,12 +1,9 @@
//! Contains code to rebuild a catalog from files.
use std::{fmt::Debug, sync::Arc};
use data_types::server_id::ServerId;
use futures::TryStreamExt;
use object_store::{
path::{parsed::DirsAndFileName, Path},
ObjectStore, ObjectStoreApi,
};
use iox_object_store::IoxObjectStore;
use object_store::path::{parsed::DirsAndFileName, Path};
use observability_deps::tracing::error;
use snafu::{ResultExt, Snafu};
@ -45,18 +42,20 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// procedure (**after creating a backup!**).
///
/// # Limitations
/// Compared to an intact catalog, wiping a catalog and rebuilding it from Parquet files has the following drawbacks:
/// Compared to an intact catalog, wiping a catalog and rebuilding it from Parquet files has the
/// following drawbacks:
///
/// - **Garbage Susceptibility:** The rebuild process will stumble over garbage parquet files (i.e. files being present
/// in the object store but that were not part of the catalog). For files that where not written by IOx it will likely
/// report [`Error::MetadataReadFailure`].
/// - **No Removals:** The rebuild system cannot recover the fact that files where removed from the catalog during some
/// transaction. This might not always be an issue due to "deduplicate while read"-logic in the query engine, but also
/// might have unwanted side effects (e.g. performance issues).
/// - **Single Transaction:** All files are added in a single transaction. Hence time-traveling is NOT possible using
/// the resulting catalog.
/// - **Fork Detection:** The rebuild procedure does NOT detects forks (i.e. files written for the same server ID by
/// multiple IOx instances).
/// - **Garbage Susceptibility:** The rebuild process will stumble over garbage parquet files (i.e.
/// files being present in the object store but that were not part of the catalog). For files
/// that were not written by IOx it will likely report [`Error::MetadataReadFailure`].
/// - **No Removals:** The rebuild system cannot recover the fact that files were removed from the
/// catalog during some transaction. This might not always be an issue due to "deduplicate while
/// read"-logic in the query engine, but also might have unwanted side effects (e.g. performance
/// issues).
/// - **Single Transaction:** All files are added in a single transaction. Hence time-traveling is
/// NOT possible using the resulting catalog.
/// - **Fork Detection:** The rebuild procedure does NOT detect forks (i.e. files written for the
/// same server ID by multiple IOx instances).
///
/// # Error Handling
/// This routine will fail if:
@ -64,10 +63,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// - **Metadata Read Failure:** There is a parquet file with metadata that cannot be read. Set
/// `ignore_metadata_read_failure` to `true` to ignore these cases.
pub async fn rebuild_catalog<S>(
object_store: Arc<ObjectStore>,
search_location: &Path,
server_id: ServerId,
db_name: String,
iox_object_store: Arc<IoxObjectStore>,
catalog_empty_input: S::EmptyInput,
ignore_metadata_read_failure: bool,
) -> Result<(PreservedCatalog, S)>
@ -75,24 +71,20 @@ where
S: CatalogState + Debug + Send + Sync,
{
// collect all revisions from parquet files
let files = collect_files(&object_store, search_location, ignore_metadata_read_failure).await?;
let files = collect_files(&iox_object_store, ignore_metadata_read_failure).await?;
// create new empty catalog
let (catalog, mut state) = PreservedCatalog::new_empty::<S>(
Arc::clone(&object_store),
server_id,
db_name,
catalog_empty_input,
)
.await
.context(NewEmptyFailure)?;
let (catalog, mut state) =
PreservedCatalog::new_empty::<S>(Arc::clone(&iox_object_store), catalog_empty_input)
.await
.context(NewEmptyFailure)?;
// create single transaction with all files
if !files.is_empty() {
let mut transaction = catalog.open_transaction().await;
for info in files {
state
.add(Arc::clone(&object_store), info.clone())
.add(Arc::clone(&iox_object_store), info.clone())
.context(FileRecordFailure)?;
transaction.add_parquet(&info).context(FileRecordFailure)?;
}
@ -102,26 +94,22 @@ where
Ok((catalog, state))
}
/// Collect all files under the given locations.
/// Collect all files for the database managed by the given IoxObjectStore.
///
/// Returns a vector of (file, size, metadata) tuples.
///
/// The file listing is recursive.
async fn collect_files(
object_store: &ObjectStore,
search_location: &Path,
iox_object_store: &IoxObjectStore,
ignore_metadata_read_failure: bool,
) -> Result<Vec<CatalogParquetInfo>> {
let mut stream = object_store
.list(Some(search_location))
.await
.context(ReadFailure)?;
let mut stream = iox_object_store.list(None).await.context(ReadFailure)?;
let mut files = vec![];
while let Some(paths) = stream.try_next().await.context(ReadFailure)? {
for path in paths.into_iter().filter(is_parquet) {
match read_parquet(object_store, &path).await {
match read_parquet(iox_object_store, &path).await {
Ok((file_size_bytes, metadata)) => {
let path = path.into();
files.push(CatalogParquetInfo {
@ -154,10 +142,10 @@ fn is_parquet(path: &Path) -> bool {
/// Read Parquet and IOx metadata from given path.
async fn read_parquet(
object_store: &ObjectStore,
iox_object_store: &IoxObjectStore,
path: &Path,
) -> Result<(usize, Arc<IoxParquetMetaData>)> {
let data = object_store
let data = iox_object_store
.get(path)
.await
.context(ReadFailure)?
@ -189,7 +177,6 @@ mod tests {
use tokio_stream::StreamExt;
use super::*;
use std::num::NonZeroU32;
use crate::metadata::IoxMetadata;
use crate::test_utils::create_partition_and_database_checkpoint;
@ -197,32 +184,26 @@ mod tests {
use crate::{
catalog::PreservedCatalog,
storage::Storage,
test_utils::{make_object_store, make_record_batch},
test_utils::{make_iox_object_store, make_record_batch},
};
#[tokio::test]
async fn test_rebuild_successfull() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = Arc::<str>::from("db1");
let iox_object_store = make_iox_object_store();
// build catalog with some data
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let (catalog, mut state) =
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
.await
.unwrap();
{
let mut transaction = catalog.open_transaction().await;
let info = create_parquet_file(&object_store, server_id, Arc::clone(&db_name), 0).await;
let info = create_parquet_file(&iox_object_store, 0).await;
state.parquet_files.insert(info.path.clone(), info.clone());
transaction.add_parquet(&info).unwrap();
let info = create_parquet_file(&object_store, server_id, Arc::clone(&db_name), 1).await;
let info = create_parquet_file(&iox_object_store, 1).await;
state.parquet_files.insert(info.path.clone(), info.clone());
transaction.add_parquet(&info).unwrap();
@ -236,7 +217,7 @@ mod tests {
{
let mut transaction = catalog.open_transaction().await;
let info = create_parquet_file(&object_store, server_id, Arc::clone(&db_name), 2).await;
let info = create_parquet_file(&iox_object_store, 2).await;
state.parquet_files.insert(info.path.clone(), info.clone());
transaction.add_parquet(&info).unwrap();
@ -252,22 +233,12 @@ mod tests {
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(&object_store, server_id, &db_name)
.await
.unwrap();
PreservedCatalog::wipe(&iox_object_store).await.unwrap();
// rebuild
let path = object_store.new_path();
let (catalog, state) = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name.to_string(),
(),
false,
)
.await
.unwrap();
let (catalog, state) = rebuild_catalog::<TestCatalogState>(iox_object_store, (), false)
.await
.unwrap();
// check match
let paths_actual = {
@ -281,38 +252,22 @@ mod tests {
#[tokio::test]
async fn test_rebuild_empty() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let iox_object_store = make_iox_object_store();
// build empty catalog
let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let (catalog, _state) =
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
.await
.unwrap();
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
PreservedCatalog::wipe(&iox_object_store).await.unwrap();
// rebuild
let path = object_store.new_path();
let (catalog, state) = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name.to_string(),
(),
false,
)
.await
.unwrap();
let (catalog, state) = rebuild_catalog::<TestCatalogState>(iox_object_store, (), false)
.await
.unwrap();
// check match
assert!(state.parquet_files.is_empty());
@ -321,54 +276,31 @@ mod tests {
#[tokio::test]
async fn test_rebuild_no_metadata() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let iox_object_store = make_iox_object_store();
// build catalog with same data
let catalog = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let catalog =
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
.await
.unwrap();
// file w/o metadata
create_parquet_file_without_metadata(&object_store, server_id, db_name, 0).await;
create_parquet_file_without_metadata(&iox_object_store, 0).await;
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
PreservedCatalog::wipe(&iox_object_store).await.unwrap();
// rebuild (do not ignore errors)
let path = object_store.new_path();
let res = rebuild_catalog::<TestCatalogState>(
Arc::clone(&object_store),
&path,
server_id,
db_name.to_string(),
(),
false,
)
.await;
let res =
rebuild_catalog::<TestCatalogState>(Arc::clone(&iox_object_store), (), false).await;
assert!(dbg!(res.unwrap_err().to_string())
.starts_with("Cannot read IOx metadata from parquet file"));
// rebuild (ignore errors)
let (catalog, state) = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name.to_string(),
(),
true,
)
.await
.unwrap();
let (catalog, state) = rebuild_catalog::<TestCatalogState>(iox_object_store, (), true)
.await
.unwrap();
assert!(state.parquet_files.is_empty());
assert_eq!(catalog.revision_counter(), 0);
}
@ -382,43 +314,27 @@ mod tests {
// There is no real need to create a checkpoint in this case. So here we delete all transaction files and then
// check that rebuilt catalog will be gone afterwards. Note the difference to the `test_rebuild_empty` case
// where we can indeed proof the existence of a catalog (even though it is empty aka has no files).
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let iox_object_store = make_iox_object_store();
// build catalog with some data (2 transactions + initial empty one)
let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let (catalog, _state) =
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
.await
.unwrap();
assert_eq!(catalog.revision_counter(), 0);
// wipe catalog
drop(catalog);
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
PreservedCatalog::wipe(&iox_object_store).await.unwrap();
// rebuild
let path = object_store.new_path();
let catalog = rebuild_catalog::<TestCatalogState>(
Arc::clone(&object_store),
&path,
server_id,
db_name.to_string(),
(),
false,
)
.await
.unwrap();
let catalog = rebuild_catalog::<TestCatalogState>(Arc::clone(&iox_object_store), (), false)
.await
.unwrap();
drop(catalog);
// delete transaction files
let paths = object_store
let paths = iox_object_store
.list(None)
.await
.unwrap()
@ -432,36 +348,25 @@ mod tests {
.file_name
.map_or(false, |part| part.encoded().ends_with(".txn"))
{
object_store.delete(&path).await.unwrap();
iox_object_store.delete(&path).await.unwrap();
deleted = true;
}
}
assert!(deleted);
// catalog gone
assert!(
!PreservedCatalog::exists(&object_store, server_id, db_name,)
.await
.unwrap()
);
}
/// Creates new test server ID
fn make_server_id() -> ServerId {
ServerId::new(NonZeroU32::new(1).unwrap())
assert!(!PreservedCatalog::exists(&iox_object_store).await.unwrap());
}
pub async fn create_parquet_file(
object_store: &Arc<ObjectStore>,
server_id: ServerId,
db_name: Arc<str>,
iox_object_store: &Arc<IoxObjectStore>,
chunk_id: u32,
) -> CatalogParquetInfo {
let table_name = Arc::from("table1");
let partition_key = Arc::from("part1");
let (record_batches, _schema, _column_summaries, _num_rows) = make_record_batch("foo");
let storage = Storage::new(Arc::clone(object_store), server_id);
let storage = Storage::new(Arc::clone(iox_object_store));
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&table_name),
Arc::clone(&partition_key),
@ -480,7 +385,7 @@ mod tests {
let (path, file_size_bytes, metadata) = storage
.write_to_object_store(
ChunkAddr {
db_name,
db_name: iox_object_store.database_name().into(),
table_name,
partition_key,
chunk_id,
@ -499,9 +404,7 @@ mod tests {
}
pub async fn create_parquet_file_without_metadata(
object_store: &Arc<ObjectStore>,
server_id: ServerId,
db_name: &str,
iox_object_store: &Arc<IoxObjectStore>,
chunk_id: u32,
) -> (DirsAndFileName, IoxParquetMetaData) {
let (record_batches, schema, _column_summaries, _num_rows) = make_record_batch("foo");
@ -520,9 +423,9 @@ mod tests {
let data = mem_writer.into_inner().unwrap();
let md = IoxParquetMetaData::from_file_bytes(data.clone()).unwrap();
let storage = Storage::new(Arc::clone(object_store), server_id);
let storage = Storage::new(Arc::clone(iox_object_store));
let path = storage.location(&ChunkAddr {
db_name: Arc::from(db_name),
db_name: Arc::from(iox_object_store.database_name()),
table_name: Arc::from("table1"),
partition_key: Arc::from("part1"),
chunk_id,

View File

@ -5,16 +5,18 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
use bytes::Bytes;
use data_types::chunk_metadata::ChunkAddr;
use datafusion::{
logical_plan::Expr,
physical_plan::{parquet::ParquetExec, ExecutionPlan, Partitioning, SendableRecordBatchStream},
};
use futures::StreamExt;
use internal_types::selection::Selection;
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use iox_object_store::IoxObjectStore;
use object_store::path::{parsed::DirsAndFileName, ObjectStorePath, Path};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use parquet::{
self,
arrow::ArrowWriter,
@ -22,11 +24,6 @@ use parquet::{
file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone},
};
use query::{exec::stream::AdapterStream, predicate::Predicate};
use bytes::Bytes;
use data_types::{chunk_metadata::ChunkAddr, server_id::ServerId};
use futures::StreamExt;
use parking_lot::Mutex;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
io::{Cursor, Seek, SeekFrom, Write},
@ -132,16 +129,12 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub struct Storage {
object_store: Arc<ObjectStore>,
server_id: ServerId,
iox_object_store: Arc<IoxObjectStore>,
}
impl Storage {
pub fn new(object_store: Arc<ObjectStore>, server_id: ServerId) -> Self {
Self {
object_store,
server_id,
}
pub fn new(iox_object_store: Arc<IoxObjectStore>) -> Self {
Self { iox_object_store }
}
/// Return full path including filename in the object store to save a chunk
@ -156,7 +149,7 @@ impl Storage {
// generate random UUID so that files are unique and never overwritten
let uuid = Uuid::new_v4();
let mut path = data_location(&self.object_store, self.server_id, &chunk_addr.db_name);
let mut path = self.iox_object_store.data_path();
path.push_dir(chunk_addr.table_name.as_ref());
path.push_dir(chunk_addr.partition_key.as_ref());
path.set_file_name(format!(
@ -236,7 +229,7 @@ impl Storage {
let data = Bytes::from(data);
let stream_data = Result::Ok(data);
self.object_store
self.iox_object_store
.put(
file_name,
futures::stream::once(async move { stream_data }),
@ -274,7 +267,7 @@ impl Storage {
predicate: Option<Expr>,
projection: Vec<usize>,
path: Path,
store: Arc<ObjectStore>,
store: Arc<IoxObjectStore>,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
) -> Result<()> {
// Size of each batch
@ -353,7 +346,7 @@ impl Storage {
selection: Selection<'_>,
schema: SchemaRef,
path: Path,
store: Arc<ObjectStore>,
store: Arc<IoxObjectStore>,
) -> Result<SendableRecordBatchStream> {
// fire up a async task that will fetch the parquet file
// locally, start it executing and send results
@ -439,32 +432,11 @@ impl TryClone for MemWriter {
}
}
/// Location where parquet data goes to.
///
/// Schema currently is:
///
/// ```text
/// <writer_id>/<database>/data
/// ```
pub(crate) fn data_location(
object_store: &ObjectStore,
server_id: ServerId,
db_name: &str,
) -> Path {
let mut path = object_store.new_path();
path.push_dir(server_id.to_string());
path.push_dir(db_name.to_string());
path.push_dir("data");
path
}
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use super::*;
use crate::test_utils::{
create_partition_and_database_checkpoint, make_object_store, make_record_batch,
create_partition_and_database_checkpoint, make_iox_object_store, make_record_batch,
};
use arrow::array::{ArrayRef, StringArray};
use arrow_util::assert_batches_eq;
@ -538,12 +510,11 @@ mod tests {
assert_batches_eq!(&expected, &input_batches);
// create Storage
let server_id = ServerId::try_from(1).unwrap();
let db_name = Arc::from("my_db");
let table_name = Arc::from("my_table");
let partition_key = Arc::from("my_partition");
let chunk_id = 33;
let storage = Storage::new(make_object_store(), server_id);
let iox_object_store = make_iox_object_store();
let storage = Storage::new(Arc::clone(&iox_object_store));
// write the data in
let schema = batch.schema();
@ -569,7 +540,7 @@ mod tests {
let (path, _file_size_bytes, _metadata) = storage
.write_to_object_store(
ChunkAddr {
db_name,
db_name: iox_object_store.database_name().into(),
table_name,
partition_key,
chunk_id,
@ -580,13 +551,13 @@ mod tests {
.await
.expect("successfully wrote to object store");
let object_store = Arc::clone(&storage.object_store);
let iox_object_store = Arc::clone(&storage.iox_object_store);
let read_stream = Storage::read_filter(
&Predicate::default(),
Selection::All,
schema,
path,
object_store,
iox_object_store,
)
.expect("successfully called read_filter");
@ -599,10 +570,10 @@ mod tests {
#[test]
fn test_locations_are_unique() {
let server_id = ServerId::try_from(1).unwrap();
let storage = Storage::new(make_object_store(), server_id);
let iox_object_store = make_iox_object_store();
let storage = Storage::new(Arc::clone(&iox_object_store));
let chunk_addr = ChunkAddr {
db_name: Arc::from("my_db"),
db_name: iox_object_store.database_name().into(),
table_name: Arc::from("my_table"),
partition_key: Arc::from("my_partition"),
chunk_id: 13,

View File

@ -12,8 +12,8 @@ mod tests {
use crate::{
metadata::IoxParquetMetaData,
test_utils::{
chunk_addr, load_parquet_from_store, make_chunk_given_record_batch, make_object_store,
make_record_batch, read_data_from_parquet_data,
chunk_addr, load_parquet_from_store, make_chunk_given_record_batch,
make_iox_object_store, make_record_batch, read_data_from_parquet_data,
},
};
@ -31,7 +31,7 @@ mod tests {
////////////////////
// Make an OS in memory
let store = make_object_store();
let store = make_iox_object_store();
////////////////////
// Store the data as a chunk and write it to in the object store

View File

@ -16,6 +16,7 @@ use data_types::{
chunk_metadata::ChunkAddr,
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
server_id::ServerId,
DatabaseName,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream;
@ -24,7 +25,8 @@ use internal_types::{
schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use object_store::{path::Path, ObjectStore, ObjectStoreApi};
use iox_object_store::IoxObjectStore;
use object_store::{path::Path, ObjectStore};
use parquet::{
arrow::{ArrowReader, ParquetFileArrowReader},
file::serialized_reader::{SerializedFileReader, SliceableCursor},
@ -53,14 +55,14 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
// This function is for test only
pub async fn load_parquet_from_store(
chunk: &ParquetChunk,
store: Arc<ObjectStore>,
store: Arc<IoxObjectStore>,
) -> Result<Vec<u8>> {
load_parquet_from_store_for_chunk(chunk, store).await
}
pub async fn load_parquet_from_store_for_chunk(
chunk: &ParquetChunk,
store: Arc<ObjectStore>,
store: Arc<IoxObjectStore>,
) -> Result<Vec<u8>> {
let path = chunk.path();
Ok(load_parquet_from_store_for_path(&path, store).await?)
@ -68,7 +70,7 @@ pub async fn load_parquet_from_store_for_chunk(
pub async fn load_parquet_from_store_for_path(
path: &Path,
store: Arc<ObjectStore>,
store: Arc<IoxObjectStore>,
) -> Result<Vec<u8>> {
let parquet_data = store
.get(path)
@ -99,17 +101,24 @@ pub fn chunk_addr(id: u32) -> ChunkAddr {
/// Same as [`make_chunk`] but parquet file does not contain any row group.
pub async fn make_chunk(
store: Arc<ObjectStore>,
iox_object_store: Arc<IoxObjectStore>,
column_prefix: &str,
addr: ChunkAddr,
) -> ParquetChunk {
let (record_batches, schema, column_summaries, _num_rows) = make_record_batch(column_prefix);
make_chunk_given_record_batch(store, record_batches, schema, addr, column_summaries).await
make_chunk_given_record_batch(
iox_object_store,
record_batches,
schema,
addr,
column_summaries,
)
.await
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
pub async fn make_chunk_no_row_group(
store: Arc<ObjectStore>,
store: Arc<IoxObjectStore>,
column_prefix: &str,
addr: ChunkAddr,
) -> ParquetChunk {
@ -121,14 +130,13 @@ pub async fn make_chunk_no_row_group(
///
/// TODO: This code creates a chunk that isn't hooked up with metrics
pub async fn make_chunk_given_record_batch(
store: Arc<ObjectStore>,
iox_object_store: Arc<IoxObjectStore>,
record_batches: Vec<RecordBatch>,
schema: Schema,
addr: ChunkAddr,
column_summaries: Vec<ColumnSummary>,
) -> ParquetChunk {
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let storage = Storage::new(Arc::clone(&store), server_id);
let storage = Storage::new(Arc::clone(&iox_object_store));
let table_summary = TableSummary {
name: addr.table_name.to_string(),
@ -166,7 +174,7 @@ pub async fn make_chunk_given_record_batch(
Arc::new(table_summary),
Arc::new(schema),
path,
Arc::clone(&store),
Arc::clone(&iox_object_store),
file_size_bytes,
Arc::new(parquet_metadata),
ChunkMetrics::new_unregistered(),
@ -708,9 +716,20 @@ pub fn make_record_batch(
(record_batches, schema, summaries, num_rows)
}
/// Creates new in-memory object store for testing.
pub fn make_object_store() -> Arc<ObjectStore> {
Arc::new(ObjectStore::new_in_memory())
/// Creates new test server ID
pub fn make_server_id() -> ServerId {
ServerId::new(NonZeroU32::new(1).unwrap())
}
/// Creates new in-memory database iox_object_store for testing.
pub fn make_iox_object_store() -> Arc<IoxObjectStore> {
let server_id = make_server_id();
let database_name = DatabaseName::new("db1").unwrap();
Arc::new(IoxObjectStore::new(
Arc::new(ObjectStore::new_in_memory()),
server_id,
&database_name,
))
}
pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) -> Vec<RecordBatch> {
@ -749,12 +768,12 @@ pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) ->
///
/// See [`make_chunk`] for details.
pub async fn make_metadata(
object_store: &Arc<ObjectStore>,
iox_object_store: &Arc<IoxObjectStore>,
column_prefix: &str,
addr: ChunkAddr,
) -> (Path, IoxParquetMetaData) {
let chunk = make_chunk(Arc::clone(object_store), column_prefix, addr).await;
let parquet_data = load_parquet_from_store(&chunk, Arc::clone(object_store))
let chunk = make_chunk(Arc::clone(iox_object_store), column_prefix, addr).await;
let parquet_data = load_parquet_from_store(&chunk, Arc::clone(iox_object_store))
.await
.unwrap();
(

View File

@ -26,6 +26,7 @@ hashbrown = "0.11"
influxdb_iox_client = { path = "../influxdb_iox_client" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
internal_types = { path = "../internal_types" }
iox_object_store = { path = "../iox_object_store" }
itertools = "0.10.1"
lifecycle = { path = "../lifecycle" }
metrics = { path = "../metrics" }

View File

@ -1,30 +1,35 @@
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use data_types::database_state::DatabaseStateCode;
use data_types::server_id::ServerId;
use data_types::{database_rules::DatabaseRules, DatabaseName};
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, TryFutureExt};
use crate::{
db::{
load::{create_preserved_catalog, load_or_create_preserved_catalog},
DatabaseToCommit,
},
ApplicationState, Db, DB_RULES_FILE_NAME,
};
use bytes::BytesMut;
use data_types::{
database_rules::DatabaseRules, database_state::DatabaseStateCode, server_id::ServerId,
DatabaseName,
};
use futures::{
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
};
use generated_types::database_rules::encode_database_rules;
use internal_types::freezable::Freezable;
use object_store::path::{ObjectStorePath, Path};
use iox_object_store::IoxObjectStore;
use object_store::{
path::{ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock;
use parquet_file::catalog::PreservedCatalog;
use persistence_windows::checkpoint::ReplayPlan;
use snafu::{ResultExt, Snafu};
use tokio::sync::Notify;
use tokio::task::JoinError;
use std::{future::Future, sync::Arc, time::Duration};
use tokio::{sync::Notify, task::JoinError};
use tokio_util::sync::CancellationToken;
use crate::db::load::{create_preserved_catalog, load_or_create_preserved_catalog};
use crate::db::DatabaseToCommit;
use crate::{ApplicationState, Db, DB_RULES_FILE_NAME};
use bytes::BytesMut;
use object_store::{ObjectStore, ObjectStoreApi};
use parquet_file::catalog::PreservedCatalog;
const INIT_BACKOFF: Duration = Duration::from_secs(1);
#[derive(Debug, Snafu)]
@ -101,12 +106,18 @@ impl Database {
pub fn new(application: Arc<ApplicationState>, config: DatabaseConfig) -> Self {
info!(db_name=%config.name, store_prefix=%config.store_prefix.display(), "new database");
let iox_object_store = Arc::new(IoxObjectStore::new(
Arc::clone(application.object_store()),
config.server_id,
&config.name,
));
let shared = Arc::new(DatabaseShared {
config,
application,
shutdown: Default::default(),
state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))),
state_notify: Default::default(),
iox_object_store,
});
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
@ -128,7 +139,11 @@ impl Database {
create_preserved_catalog(
db_name.as_str(),
Arc::clone(application.object_store()),
Arc::new(IoxObjectStore::new(
Arc::clone(application.object_store()),
server_id,
&db_name,
)),
server_id,
Arc::clone(application.metric_registry()),
true,
@ -184,6 +199,10 @@ impl Database {
.map(|state| Arc::clone(&state.db))
}
pub fn iox_object_store(&self) -> Arc<IoxObjectStore> {
Arc::clone(&self.shared.iox_object_store)
}
/// Returns Ok(()) when the Database is initialized, or the error
/// if one is encountered
pub async fn wait_for_init(&self) -> Result<(), Arc<InitError>> {
@ -237,14 +256,10 @@ impl Database {
Ok(async move {
let db_name = &shared.config.name;
PreservedCatalog::wipe(
shared.application.object_store().as_ref(),
shared.config.server_id,
db_name,
)
.await
.map_err(Box::new)
.context(WipePreservedCatalog { db_name })?;
PreservedCatalog::wipe(&shared.iox_object_store)
.await
.map_err(Box::new)
.context(WipePreservedCatalog { db_name })?;
{
let mut state = shared.state.write();
@ -331,6 +346,9 @@ struct DatabaseShared {
/// Notify that the database state has changed
state_notify: Notify,
/// The object store interface for this database
iox_object_store: Arc<IoxObjectStore>,
}
/// The background worker for `Database` - there should only ever be one
@ -627,7 +645,7 @@ impl DatabaseStateRulesLoaded {
) -> Result<DatabaseStateCatalogLoaded, InitError> {
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
shared.config.name.as_str(),
Arc::clone(shared.application.object_store()),
Arc::clone(&shared.iox_object_store),
shared.config.server_id,
Arc::clone(shared.application.metric_registry()),
shared.config.wipe_catalog_on_error,
@ -645,7 +663,11 @@ impl DatabaseStateRulesLoaded {
let database_to_commit = DatabaseToCommit {
server_id: shared.config.server_id,
object_store: Arc::clone(shared.application.object_store()),
iox_object_store: Arc::new(IoxObjectStore::new(
Arc::clone(shared.application.object_store()),
shared.config.server_id,
&shared.config.name,
)),
exec: Arc::clone(shared.application.executor()),
rules: Arc::clone(&self.rules),
preserved_catalog,

View File

@ -28,9 +28,10 @@ use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use entry::{Entry, Sequence, SequencedEntry, TableBatch};
use futures::{stream::BoxStream, StreamExt};
use internal_types::schema::Schema;
use iox_object_store::IoxObjectStore;
use metrics::KeyValue;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::{debug, error, info};
use parking_lot::{Mutex, RwLock};
use parquet_file::{
@ -317,7 +318,7 @@ pub struct Db {
server_id: ServerId, // this is also the Query Server ID
/// Interface to use for persistence
store: Arc<ObjectStore>,
iox_object_store: Arc<IoxObjectStore>,
/// Executor for running queries
exec: Arc<Executor>,
@ -394,7 +395,7 @@ pub struct Db {
#[derive(Debug)]
pub(crate) struct DatabaseToCommit {
pub(crate) server_id: ServerId,
pub(crate) object_store: Arc<ObjectStore>,
pub(crate) iox_object_store: Arc<IoxObjectStore>,
pub(crate) exec: Arc<Executor>,
pub(crate) preserved_catalog: PreservedCatalog,
pub(crate) catalog: Catalog,
@ -408,7 +409,7 @@ impl Db {
let rules = RwLock::new(database_to_commit.rules);
let server_id = database_to_commit.server_id;
let store = Arc::clone(&database_to_commit.object_store);
let iox_object_store = Arc::clone(&database_to_commit.iox_object_store);
let metrics_registry = Arc::clone(&database_to_commit.catalog.metrics_registry);
let metric_labels = database_to_commit.catalog.metric_labels.clone();
@ -432,7 +433,7 @@ impl Db {
let this = Self {
rules,
server_id,
store,
iox_object_store,
exec: database_to_commit.exec,
preserved_catalog: Arc::new(database_to_commit.preserved_catalog),
catalog,
@ -2666,9 +2667,10 @@ mod tests {
assert_eq!(path_list[0], path);
// Now read data from that path
let parquet_data = load_parquet_from_store_for_path(&path_list[0], object_store)
.await
.unwrap();
let parquet_data =
load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store))
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap();
// Read metadata at file level
let schema = parquet_metadata.read_schema().unwrap();
@ -2808,9 +2810,10 @@ mod tests {
assert_eq!(path_list[0], path);
// Now read data from that path
let parquet_data = load_parquet_from_store_for_path(&path_list[0], object_store)
.await
.unwrap();
let parquet_data =
load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store))
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap();
// Read metadata at file level
let schema = parquet_metadata.read_schema().unwrap();
@ -3864,14 +3867,10 @@ mod tests {
// ==================== check: empty catalog created ====================
// at this point, an empty preserved catalog exists
let maybe_preserved_catalog = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let maybe_preserved_catalog =
PreservedCatalog::load::<TestCatalogState>(Arc::clone(&db.iox_object_store), ())
.await
.unwrap();
assert!(maybe_preserved_catalog.is_some());
// ==================== do: write data to parquet ====================
@ -3901,15 +3900,11 @@ mod tests {
}
}
paths_expected.sort();
let (_preserved_catalog, catalog) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap()
.unwrap();
let (_preserved_catalog, catalog) =
PreservedCatalog::load::<TestCatalogState>(Arc::clone(&db.iox_object_store), ())
.await
.unwrap()
.unwrap();
let paths_actual = {
let mut tmp: Vec<String> = catalog.parquet_files.keys().map(|p| p.display()).collect();
tmp.sort();

View File

@ -953,7 +953,7 @@ mod tests {
use mutable_buffer::chunk::ChunkMetrics as MBChunkMetrics;
use parquet_file::{
chunk::ParquetChunk,
test_utils::{make_chunk as make_parquet_chunk_with_store, make_object_store},
test_utils::{make_chunk as make_parquet_chunk_with_store, make_iox_object_store},
};
#[test]
@ -1120,8 +1120,8 @@ mod tests {
}
async fn make_parquet_chunk(addr: ChunkAddr) -> ParquetChunk {
let object_store = make_object_store();
make_parquet_chunk_with_store(object_store, "foo", addr).await
let iox_object_store = make_iox_object_store();
make_parquet_chunk_with_store(iox_object_store, "foo", addr).await
}
fn chunk_addr() -> ChunkAddr {

View File

@ -74,7 +74,7 @@ pub(super) fn write_chunk_to_object_store(
let partition = partition.into_data().partition;
// Create a storage to save data of this chunk
let storage = Storage::new(Arc::clone(&db.store), db.server_id);
let storage = Storage::new(Arc::clone(&db.iox_object_store));
let catalog_transactions_until_checkpoint = db
.rules
@ -140,7 +140,7 @@ pub(super) fn write_chunk_to_object_store(
let parquet_chunk = Arc::new(
ParquetChunk::new(
path.clone(),
Arc::clone(&db.store),
Arc::clone(&db.iox_object_store),
file_size_bytes,
Arc::clone(&parquet_metadata),
Arc::clone(&table_name),

View File

@ -1,11 +1,11 @@
//! Functionality to load a [`Catalog`](crate::db::catalog::Catalog) and other information from a
//! [`PreservedCatalog`](parquet_file::catalog::PreservedCatalog).
use std::sync::Arc;
use super::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle, Catalog};
use data_types::server_id::ServerId;
use iox_object_store::IoxObjectStore;
use metrics::{KeyValue, MetricRegistry};
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::{error, info};
use parquet_file::{
catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog},
@ -13,8 +13,7 @@ use parquet_file::{
};
use persistence_windows::checkpoint::{ReplayPlan, ReplayPlanner};
use snafu::{ResultExt, Snafu};
use super::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle, Catalog};
use std::sync::Arc;
#[derive(Debug, Snafu)]
pub enum Error {
@ -48,7 +47,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// <https://github.com/influxdata/influxdb_iox/issues/1522>
pub async fn load_or_create_preserved_catalog(
db_name: &str,
object_store: Arc<ObjectStore>,
iox_object_store: Arc<IoxObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
wipe_on_error: bool,
@ -56,9 +55,7 @@ pub async fn load_or_create_preserved_catalog(
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
// first try to load existing catalogs
match PreservedCatalog::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
Arc::clone(&iox_object_store),
LoaderEmptyInput::new(
db_name,
server_id,
@ -87,7 +84,7 @@ pub async fn load_or_create_preserved_catalog(
create_preserved_catalog(
db_name,
Arc::clone(&object_store),
Arc::clone(&iox_object_store),
server_id,
Arc::clone(&metrics_registry),
skip_replay,
@ -100,13 +97,13 @@ pub async fn load_or_create_preserved_catalog(
// broken => wipe for now (at least during early iterations)
error!("cannot load catalog, so wipe it: {}", e);
PreservedCatalog::wipe(&object_store, server_id, db_name)
PreservedCatalog::wipe(&iox_object_store)
.await
.context(CannotWipeCatalog)?;
create_preserved_catalog(
db_name,
Arc::clone(&object_store),
Arc::clone(&iox_object_store),
server_id,
Arc::clone(&metrics_registry),
skip_replay,
@ -124,15 +121,13 @@ pub async fn load_or_create_preserved_catalog(
/// This will fail if a preserved catalog already exists.
pub async fn create_preserved_catalog(
db_name: &str,
object_store: Arc<ObjectStore>,
iox_object_store: Arc<IoxObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
skip_replay: bool,
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
let (preserved_catalog, loader) = PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
Arc::clone(&iox_object_store),
LoaderEmptyInput::new(
db_name,
server_id,
@ -206,7 +201,7 @@ impl CatalogState for Loader {
fn add(
&mut self,
object_store: Arc<ObjectStore>,
iox_object_store: Arc<IoxObjectStore>,
info: CatalogParquetInfo,
) -> parquet_file::catalog::Result<()> {
use parquet_file::catalog::{MetadataExtractFailed, ReplayPlanError, SchemaError};
@ -237,8 +232,8 @@ impl CatalogState for Loader {
let metrics = ParquetChunkMetrics::new(&metrics);
let parquet_chunk = ParquetChunk::new(
object_store.path_from_dirs_and_filename(info.path.clone()),
object_store,
iox_object_store.path_from_dirs_and_filename(info.path.clone()),
iox_object_store,
info.file_size_bytes,
info.metadata,
Arc::clone(&iox_md.table_name),
@ -308,38 +303,34 @@ impl CatalogState for Loader {
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use super::*;
use crate::db::checkpoint_data_from_catalog;
use data_types::DatabaseName;
use object_store::ObjectStore;
use parquet_file::catalog::{
test_helpers::{assert_catalog_state_implementation, TestCatalogState},
CheckpointData,
};
use crate::db::checkpoint_data_from_catalog;
use super::*;
use std::convert::TryFrom;
#[tokio::test]
async fn load_or_create_preserved_catalog_recovers_from_error() {
let object_store = Arc::new(ObjectStore::new_in_memory());
let server_id = ServerId::try_from(1).unwrap();
let db_name = "preserved_catalog_test";
let db_name = DatabaseName::new("preserved_catalog_test").unwrap();
let iox_object_store = Arc::new(IoxObjectStore::new(object_store, server_id, &db_name));
let (preserved_catalog, _catalog) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let (preserved_catalog, _catalog) =
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
.await
.unwrap();
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await;
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
load_or_create_preserved_catalog(
db_name,
object_store,
&db_name,
iox_object_store,
server_id,
metrics_registry,
true,

View File

@ -68,13 +68,10 @@
clippy::future_not_send
)]
use std::sync::Arc;
use async_trait::async_trait;
use data_types::database_rules::ShardConfig;
use data_types::error::ErrorLogger;
use data_types::{
database_rules::{DatabaseRules, NodeGroup, RoutingRules, ShardId, Sink},
database_rules::{DatabaseRules, NodeGroup, RoutingRules, ShardConfig, ShardId, Sink},
error::ErrorLogger,
job::Job,
server_id::ServerId,
{DatabaseName, DatabaseNameError},
@ -88,26 +85,26 @@ use influxdb_line_protocol::ParsedLine;
use internal_types::freezable::Freezable;
use lifecycle::LockableChunk;
use metrics::{KeyValue, MetricObserverBuilder};
use object_store::{ObjectStore, ObjectStoreApi};
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock;
use query::exec::Executor;
use rand::seq::SliceRandom;
use resolver::Resolver;
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::task::JoinError;
use std::sync::Arc;
use tokio::{sync::Notify, task::JoinError};
use tokio_util::sync::CancellationToken;
use tracker::{TaskTracker, TrackedFutureExt};
pub use application::ApplicationState;
pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer};
pub use db::Db;
pub use job::JobRegistry;
use object_store::path::parsed::DirsAndFileName;
use object_store::path::{ObjectStorePath, Path};
pub use resolver::{GrpcConnectionString, RemoteTemplate};
use tokio::sync::Notify;
mod application;
mod connection;
@ -671,7 +668,7 @@ where
/// Tells the server the set of rules for a database.
///
/// Waits until the database has initialized or failed to do so
pub async fn create_database(&self, rules: DatabaseRules) -> Result<()> {
pub async fn create_database(&self, rules: DatabaseRules) -> Result<Arc<Database>> {
let db_name = rules.name.clone();
let object_store = self.shared.application.object_store().as_ref();
@ -727,7 +724,7 @@ where
database.wait_for_init().await.context(DatabaseInit)?;
Ok(())
Ok(database)
}
pub async fn write_pb(&self, database_batch: pb::DatabaseBatch) -> Result<()> {
@ -1274,6 +1271,25 @@ fn database_store_prefix(
#[cfg(test)]
mod tests {
use super::*;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use bytes::Bytes;
use connection::test_helpers::{TestConnectionManager, TestRemoteServer};
use data_types::{
chunk_metadata::ChunkAddr,
database_rules::{
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG,
},
};
use futures::TryStreamExt;
use generated_types::database_rules::decode_database_rules;
use influxdb_line_protocol::parse_lines;
use iox_object_store::IoxObjectStore;
use metrics::TestMetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore};
use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog};
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase};
use std::{
convert::{Infallible, TryFrom},
sync::{
@ -1282,27 +1298,8 @@ mod tests {
},
time::{Duration, Instant},
};
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use futures::TryStreamExt;
use arrow_util::assert_batches_eq;
use connection::test_helpers::{TestConnectionManager, TestRemoteServer};
use data_types::chunk_metadata::ChunkAddr;
use data_types::database_rules::{
HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, NO_SHARD_CONFIG,
};
use generated_types::database_rules::decode_database_rules;
use influxdb_line_protocol::parse_lines;
use metrics::TestMetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore};
use parquet_file::catalog::{test_helpers::TestCatalogState, PreservedCatalog};
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase};
use test_helpers::assert_contains;
use super::*;
const ARBITRARY_DEFAULT_TIME: i64 = 456;
fn make_application() -> Arc<ApplicationState> {
@ -1440,7 +1437,7 @@ mod tests {
async fn create_simple_database<M>(
server: &Server<M>,
name: impl Into<String> + Send,
) -> Result<()>
) -> Result<Arc<Database>>
where
M: ConnectionManager + Send + Sync,
{
@ -2013,11 +2010,11 @@ mod tests {
// 3. existing one, but rules file is broken => can be wiped, will not exist afterwards
// 4. existing one, but catalog is broken => can be wiped, will exist afterwards
// 5. recently (during server lifecycle) created one => cannot be wiped
let db_name_existing = DatabaseName::new("db_existing".to_string()).unwrap();
let db_name_non_existing = DatabaseName::new("db_non_existing".to_string()).unwrap();
let db_name_rules_broken = DatabaseName::new("db_broken_rules".to_string()).unwrap();
let db_name_catalog_broken = DatabaseName::new("db_broken_catalog".to_string()).unwrap();
let db_name_created = DatabaseName::new("db_created".to_string()).unwrap();
let db_name_existing = DatabaseName::new("db_existing").unwrap();
let db_name_non_existing = DatabaseName::new("db_non_existing").unwrap();
let db_name_rules_broken = DatabaseName::new("db_broken_rules").unwrap();
let db_name_catalog_broken = DatabaseName::new("db_broken_catalog").unwrap();
let db_name_created = DatabaseName::new("db_created").unwrap();
// setup
let application = make_application();
@ -2029,7 +2026,7 @@ mod tests {
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
create_simple_database(&server, db_name_existing.clone())
let existing = create_simple_database(&server, db_name_existing.clone())
.await
.expect("failed to create database");
@ -2037,7 +2034,7 @@ mod tests {
.await
.expect("failed to create database");
create_simple_database(&server, db_name_catalog_broken.clone())
let catalog_broken = create_simple_database(&server, db_name_catalog_broken.clone())
.await
.expect("failed to create database");
@ -2057,15 +2054,11 @@ mod tests {
.await
.unwrap();
let (preserved_catalog, _catalog) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&store),
server_id,
db_name_catalog_broken.to_string(),
(),
)
.await
.unwrap()
.unwrap();
let (preserved_catalog, _catalog) =
PreservedCatalog::load::<TestCatalogState>(catalog_broken.iox_object_store(), ())
.await
.unwrap()
.unwrap();
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await;
@ -2121,27 +2114,23 @@ mod tests {
.to_string(),
"error wiping preserved catalog: database (db_existing) in invalid state (Initialized) for transition (WipePreservedCatalog)"
);
assert!(PreservedCatalog::exists(
application.object_store().as_ref(),
server_id,
db_name_existing.as_str()
)
.await
.unwrap());
assert!(PreservedCatalog::exists(&existing.iox_object_store(),)
.await
.unwrap());
// 2. cannot wipe non-existent DB
assert!(matches!(
server.database(&db_name_non_existing).unwrap_err(),
Error::DatabaseNotFound { .. }
));
PreservedCatalog::new_empty::<TestCatalogState>(
let non_existing_iox_object_store = Arc::new(IoxObjectStore::new(
Arc::clone(application.object_store()),
server_id,
db_name_non_existing.to_string(),
(),
)
.await
.unwrap();
&db_name_non_existing,
));
PreservedCatalog::new_empty::<TestCatalogState>(non_existing_iox_object_store, ())
.await
.unwrap();
assert_eq!(
server
.wipe_preserved_catalog(&db_name_non_existing)
@ -2182,13 +2171,9 @@ mod tests {
database.wait_for_init().await.unwrap();
assert!(PreservedCatalog::exists(
application.object_store().as_ref(),
server_id,
&db_name_catalog_broken.to_string()
)
.await
.unwrap());
assert!(PreservedCatalog::exists(&catalog_broken.iox_object_store())
.await
.unwrap());
assert!(database.init_error().is_none());
assert!(server.db(&db_name_catalog_broken).is_ok());
@ -2200,7 +2185,7 @@ mod tests {
.expect("DB writable");
// 5. cannot wipe if DB was just created
server
let created = server
.create_database(DatabaseRules::new(db_name_created.clone()))
.await
.unwrap();
@ -2212,35 +2197,32 @@ mod tests {
.to_string(),
"error wiping preserved catalog: database (db_created) in invalid state (Initialized) for transition (WipePreservedCatalog)"
);
assert!(PreservedCatalog::exists(
application.object_store().as_ref(),
server_id,
&db_name_created.to_string()
)
.await
.unwrap());
assert!(PreservedCatalog::exists(&created.iox_object_store())
.await
.unwrap());
}
#[tokio::test]
async fn cannot_create_db_when_catalog_is_present() {
let application = make_application();
let server_id = ServerId::try_from(1).unwrap();
let db_name = "my_db";
let db_name = DatabaseName::new("my_db").unwrap();
// setup server
let server = make_server(Arc::clone(&application));
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
// create catalog
PreservedCatalog::new_empty::<TestCatalogState>(
let iox_object_store = Arc::new(IoxObjectStore::new(
Arc::clone(application.object_store()),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
&db_name,
));
// create catalog
PreservedCatalog::new_empty::<TestCatalogState>(iox_object_store, ())
.await
.unwrap();
// creating database will now result in an error
let err = create_simple_database(&server, db_name).await.unwrap_err();

View File

@ -1,21 +1,19 @@
use std::{borrow::Cow, convert::TryFrom, num::NonZeroU32, sync::Arc, time::Duration};
use data_types::{
chunk_metadata::{ChunkStorage, ChunkSummary},
database_rules::{DatabaseRules, PartitionTemplate, TemplatePart},
server_id::ServerId,
DatabaseName,
};
use object_store::ObjectStore;
use persistence_windows::checkpoint::ReplayPlan;
use query::{exec::Executor, QueryDatabase};
use write_buffer::config::WriteBufferConfig;
use crate::{
db::{load::load_or_create_preserved_catalog, DatabaseToCommit, Db},
JobRegistry,
};
use data_types::database_rules::LifecycleRules;
use data_types::{
chunk_metadata::{ChunkStorage, ChunkSummary},
database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart},
server_id::ServerId,
DatabaseName,
};
use iox_object_store::IoxObjectStore;
use object_store::ObjectStore;
use persistence_windows::checkpoint::ReplayPlan;
use query::{exec::Executor, QueryDatabase};
use std::{borrow::Cow, convert::TryFrom, num::NonZeroU32, sync::Arc, time::Duration};
use write_buffer::config::WriteBufferConfig;
// A wrapper around a Db and a metrics registry allowing for isolated testing
// of a Db and its metrics.
@ -55,9 +53,12 @@ impl TestDbBuilder {
let db_name = self
.db_name
.unwrap_or_else(|| DatabaseName::new("placeholder").unwrap());
let object_store = self
.object_store
.unwrap_or_else(|| Arc::new(ObjectStore::new_in_memory()));
let iox_object_store = Arc::new(IoxObjectStore::new(
self.object_store
.unwrap_or_else(|| Arc::new(ObjectStore::new_in_memory())),
server_id,
&db_name,
));
// deterministic thread and concurrency count
let mut exec = Executor::new(1);
@ -68,7 +69,7 @@ impl TestDbBuilder {
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
db_name.as_str(),
Arc::clone(&object_store),
Arc::clone(&iox_object_store),
server_id,
Arc::clone(&metrics_registry),
false,
@ -103,7 +104,7 @@ impl TestDbBuilder {
let database_to_commit = DatabaseToCommit {
rules: Arc::new(rules),
server_id,
object_store,
iox_object_store,
preserved_catalog,
catalog,
write_buffer: self.write_buffer,