test: allow creation of deterministic chunks and transactions

pull/24376/head
Marco Neumann 2021-08-31 13:30:28 +02:00
parent 06833110ab
commit 70a5ffeae7
4 changed files with 95 additions and 15 deletions

View File

@ -30,6 +30,16 @@ impl ParquetFilePath {
}
}
/// Create a location for testing purposes w/ a fixed UUID.
pub fn new_for_testing(chunk_addr: &ChunkAddr, uuid: Uuid) -> Self {
Self {
table_name: Arc::clone(&chunk_addr.table_name),
partition_key: Arc::clone(&chunk_addr.partition_key),
chunk_id: chunk_addr.chunk_id,
uuid,
}
}
/// Turn this into directories and file names to be added to a root path or to be serialized
/// in protobuf.
pub fn relative_dirs_and_file_name(&self) -> DirsAndFileName {

View File

@ -239,6 +239,9 @@ pub struct PreservedCatalog {
transaction_semaphore: Semaphore,
iox_object_store: Arc<IoxObjectStore>,
fixed_uuid: Option<Uuid>,
fixed_timestamp: Option<DateTime<Utc>>,
}
impl PreservedCatalog {
@ -309,6 +312,37 @@ impl PreservedCatalog {
iox_object_store: Arc<IoxObjectStore>,
state_data: S::EmptyInput,
) -> Result<(Self, S)>
where
S: CatalogState + Send + Sync,
{
Self::new_empty_inner::<S>(iox_object_store, state_data, None, None).await
}
/// Same as [`new_empty`](Self::new_empty) but for testing.
pub async fn new_empty_for_testing<S>(
iox_object_store: Arc<IoxObjectStore>,
state_data: S::EmptyInput,
fixed_uuid: Uuid,
fixed_timestamp: DateTime<Utc>,
) -> Result<(Self, S)>
where
S: CatalogState + Send + Sync,
{
Self::new_empty_inner::<S>(
iox_object_store,
state_data,
Some(fixed_uuid),
Some(fixed_timestamp),
)
.await
}
pub async fn new_empty_inner<S>(
iox_object_store: Arc<IoxObjectStore>,
state_data: S::EmptyInput,
fixed_uuid: Option<Uuid>,
fixed_timestamp: Option<DateTime<Utc>>,
) -> Result<(Self, S)>
where
S: CatalogState + Send + Sync,
{
@ -321,6 +355,8 @@ impl PreservedCatalog {
previous_tkey: RwLock::new(None),
transaction_semaphore: Semaphore::new(1),
iox_object_store,
fixed_uuid,
fixed_timestamp,
};
// add empty transaction
@ -444,6 +480,8 @@ impl PreservedCatalog {
previous_tkey: RwLock::new(last_tkey),
transaction_semaphore: Semaphore::new(1),
iox_object_store,
fixed_uuid: None,
fixed_timestamp: None,
},
state,
)))
@ -456,13 +494,9 @@ impl PreservedCatalog {
/// the state after `await` (esp. post-blocking). This system is fair, which means that
/// transactions are given out in the order they were requested.
pub async fn open_transaction(&self) -> TransactionHandle<'_> {
self.open_transaction_with_uuid(Uuid::new_v4()).await
}
/// Crate-private API to open an transaction with a specified UUID. Should only be used for
/// catalog rebuilding or with a fresh V4-UUID!
pub(crate) async fn open_transaction_with_uuid(&self, uuid: Uuid) -> TransactionHandle<'_> {
TransactionHandle::new(self, uuid).await
let uuid = self.fixed_uuid.unwrap_or_else(Uuid::new_v4);
let start_timestamp = self.fixed_timestamp.unwrap_or_else(Utc::now);
TransactionHandle::new(self, uuid, start_timestamp).await
}
/// Get latest revision counter.
@ -501,7 +535,11 @@ struct OpenTransaction {
impl OpenTransaction {
/// Private API to create new transaction, users should always use
/// [`PreservedCatalog::open_transaction`].
fn new(previous_tkey: &Option<TransactionKey>, uuid: Uuid) -> Self {
fn new(
previous_tkey: &Option<TransactionKey>,
uuid: Uuid,
start_timestamp: DateTime<Utc>,
) -> Self {
let (revision_counter, previous_uuid) = match previous_tkey {
Some(tkey) => (tkey.revision_counter + 1, tkey.uuid.to_string()),
None => (0, String::new()),
@ -514,7 +552,7 @@ impl OpenTransaction {
uuid: uuid.to_string(),
revision_counter,
previous_uuid,
start_timestamp: Some(Utc::now().into()),
start_timestamp: Some(start_timestamp.into()),
encoding: proto::transaction::Encoding::Delta.into(),
},
}
@ -717,7 +755,11 @@ pub struct TransactionHandle<'c> {
}
impl<'c> TransactionHandle<'c> {
async fn new(catalog: &'c PreservedCatalog, uuid: Uuid) -> TransactionHandle<'c> {
async fn new(
catalog: &'c PreservedCatalog,
uuid: Uuid,
start_timestamp: DateTime<Utc>,
) -> TransactionHandle<'c> {
// first acquire semaphore (which is only being used for transactions), then get state lock
let permit = catalog
.transaction_semaphore
@ -726,7 +768,7 @@ impl<'c> TransactionHandle<'c> {
.expect("semaphore should not be closed");
let previous_tkey_guard = catalog.previous_tkey.write();
let transaction = OpenTransaction::new(&previous_tkey_guard, uuid);
let transaction = OpenTransaction::new(&previous_tkey_guard, uuid, start_timestamp);
// free state for readers again
drop(previous_tkey_guard);

View File

@ -28,6 +28,7 @@ use std::{
io::{Cursor, Seek, SeekFrom, Write},
sync::Arc,
};
use uuid::Uuid;
use crate::metadata::{IoxMetadata, IoxParquetMetaData, METADATA_KEY};
@ -125,11 +126,23 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub struct Storage {
iox_object_store: Arc<IoxObjectStore>,
fixed_uuid: Option<Uuid>,
}
impl Storage {
pub fn new(iox_object_store: Arc<IoxObjectStore>) -> Self {
Self { iox_object_store }
Self {
iox_object_store,
fixed_uuid: None,
}
}
/// Create new instance for testing w/ a fixed UUID.
pub fn new_for_testing(iox_object_store: Arc<IoxObjectStore>, uuid: Uuid) -> Self {
Self {
iox_object_store,
fixed_uuid: Some(uuid),
}
}
/// Write the given stream of data of a specified table of
@ -144,7 +157,10 @@ impl Storage {
metadata: IoxMetadata,
) -> Result<(ParquetFilePath, usize, IoxParquetMetaData)> {
// Create full path location of this file in object store
let path = ParquetFilePath::new(&chunk_addr);
let path = match self.fixed_uuid {
Some(uuid) => ParquetFilePath::new_for_testing(&chunk_addr, uuid),
None => ParquetFilePath::new(&chunk_addr),
};
let schema = stream.schema();
let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?;
@ -579,6 +595,7 @@ mod tests {
schema.clone(),
addr,
column_summaries.clone(),
TestSize::Full,
)
.await;

View File

@ -37,6 +37,7 @@ use persistence_windows::{
};
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeMap, num::NonZeroU32, sync::Arc};
use uuid::Uuid;
#[derive(Debug, Snafu)]
pub enum Error {
@ -58,6 +59,10 @@ pub enum TestSize {
}
impl TestSize {
pub fn is_minimal(&self) -> bool {
matches!(self, Self::Minimal)
}
pub fn is_full(&self) -> bool {
matches!(self, Self::Full)
}
@ -126,6 +131,7 @@ pub async fn make_chunk(
schema,
addr,
column_summaries,
test_size,
)
.await
}
@ -138,7 +144,7 @@ pub async fn make_chunk_no_row_group(
test_size: TestSize,
) -> ParquetChunk {
let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix, test_size);
make_chunk_given_record_batch(store, vec![], schema, addr, column_summaries).await
make_chunk_given_record_batch(store, vec![], schema, addr, column_summaries, test_size).await
}
/// Create a test chunk by writing data to object store.
@ -150,8 +156,13 @@ pub async fn make_chunk_given_record_batch(
schema: Schema,
addr: ChunkAddr,
column_summaries: Vec<ColumnSummary>,
test_size: TestSize,
) -> ParquetChunk {
let storage = Storage::new(Arc::clone(&iox_object_store));
let storage = if test_size.is_minimal() {
Storage::new_for_testing(Arc::clone(&iox_object_store), Uuid::nil())
} else {
Storage::new(Arc::clone(&iox_object_store))
};
let table_summary = TableSummary {
name: addr.table_name.to_string(),