Merge branch 'main' into crepererum/better_served_uninit_error

pull/24376/head
kodiakhq[bot] 2021-06-23 08:54:48 +00:00 committed by GitHub
commit d94a9ea94a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 553 additions and 728 deletions

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,7 @@ use std::{
}; };
use crate::{ use crate::{
catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog, TransactionEnd}, catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog},
storage::data_location, storage::data_location,
}; };
use futures::TryStreamExt; use futures::TryStreamExt;
@ -39,17 +39,13 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// ///
/// This will hold the transaction lock while the list of files is being gathered. To limit the time the lock is held /// This will hold the transaction lock while the list of files is being gathered. To limit the time the lock is held
/// use `max_files` which will limit the number of files to delete in this cleanup round. /// use `max_files` which will limit the number of files to delete in this cleanup round.
pub async fn cleanup_unreferenced_parquet_files<S>( pub async fn cleanup_unreferenced_parquet_files(
catalog: &PreservedCatalog, catalog: &PreservedCatalog,
state: Arc<S>,
max_files: usize, max_files: usize,
) -> Result<()> ) -> Result<()> {
where
S: CatalogState + Send + Sync,
{
// Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there // Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there
// that are about to get added to the catalog. // that are about to get added to the catalog.
let transaction = catalog.open_transaction(state).await; let transaction = catalog.open_transaction().await;
let store = catalog.object_store(); let store = catalog.object_store();
let server_id = catalog.server_id(); let server_id = catalog.server_id();
@ -130,35 +126,19 @@ impl CatalogState for TracerCatalogState {
} }
} }
type TransactionState = Arc<Self>;
fn transaction_begin(origin: &Arc<Self>) -> Self::TransactionState {
// no copy
Arc::clone(origin)
}
fn transaction_end(tstate: Self::TransactionState, _how: TransactionEnd) -> Arc<Self> {
// we don't care about aborts because they are not during clean-up
tstate
}
fn add( fn add(
tstate: &mut Self::TransactionState, &mut self,
_object_store: Arc<ObjectStore>, _object_store: Arc<ObjectStore>,
info: CatalogParquetInfo, info: CatalogParquetInfo,
) -> crate::catalog::Result<()> { ) -> crate::catalog::Result<()> {
tstate self.files
.files
.lock() .lock()
.expect("lock poissened?") .expect("lock poissened?")
.insert(info.path); .insert(info.path);
Ok(()) Ok(())
} }
fn remove( fn remove(&mut self, _path: DirsAndFileName) -> crate::catalog::Result<()> {
_tstate: &mut Self::TransactionState,
_path: DirsAndFileName,
) -> crate::catalog::Result<()> {
// Do NOT remove the file since we still need it for time travel // Do NOT remove the file since we still need it for time travel
Ok(()) Ok(())
} }
@ -184,7 +164,7 @@ mod tests {
let server_id = make_server_id(); let server_id = make_server_id();
let db_name = "db1"; let db_name = "db1";
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>( let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -194,7 +174,7 @@ mod tests {
.unwrap(); .unwrap();
// run clean-up // run clean-up
cleanup_unreferenced_parquet_files(&catalog, state, 1_000) cleanup_unreferenced_parquet_files(&catalog, 1_000)
.await .await
.unwrap(); .unwrap();
} }
@ -205,7 +185,7 @@ mod tests {
let server_id = make_server_id(); let server_id = make_server_id();
let db_name = db_name(); let db_name = db_name();
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>( let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -218,7 +198,7 @@ mod tests {
let mut paths_keep = vec![]; let mut paths_keep = vec![];
let mut paths_delete = vec![]; let mut paths_delete = vec![];
{ {
let mut transaction = catalog.open_transaction(state).await; let mut transaction = catalog.open_transaction().await;
// an ordinary tracked parquet file => keep // an ordinary tracked parquet file => keep
let (path, md) = make_metadata(&object_store, "foo", chunk_addr(1)).await; let (path, md) = make_metadata(&object_store, "foo", chunk_addr(1)).await;
@ -242,11 +222,11 @@ mod tests {
let (path, _md) = make_metadata(&object_store, "foo", chunk_addr(3)).await; let (path, _md) = make_metadata(&object_store, "foo", chunk_addr(3)).await;
paths_delete.push(path.display()); paths_delete.push(path.display());
state = transaction.commit(None).await.unwrap(); transaction.commit().await.unwrap();
} }
// run clean-up // run clean-up
cleanup_unreferenced_parquet_files(&catalog, state, 1_000) cleanup_unreferenced_parquet_files(&catalog, 1_000)
.await .await
.unwrap(); .unwrap();
@ -266,7 +246,7 @@ mod tests {
let server_id = make_server_id(); let server_id = make_server_id();
let db_name = db_name(); let db_name = db_name();
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>( let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -279,17 +259,17 @@ mod tests {
for i in 0..100 { for i in 0..100 {
let (path, _) = tokio::join!( let (path, _) = tokio::join!(
async { async {
let mut transaction = catalog.open_transaction(Arc::clone(&state)).await; let mut transaction = catalog.open_transaction().await;
let (path, md) = make_metadata(&object_store, "foo", chunk_addr(i)).await; let (path, md) = make_metadata(&object_store, "foo", chunk_addr(i)).await;
transaction.add_parquet(&path.clone().into(), &md).unwrap(); transaction.add_parquet(&path.clone().into(), &md).unwrap();
transaction.commit(None).await.unwrap(); transaction.commit().await.unwrap();
path.display() path.display()
}, },
async { async {
cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 1_000) cleanup_unreferenced_parquet_files(&catalog, 1_000)
.await .await
.unwrap(); .unwrap();
}, },
@ -306,7 +286,7 @@ mod tests {
let server_id = make_server_id(); let server_id = make_server_id();
let db_name = db_name(); let db_name = db_name();
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>( let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -323,7 +303,7 @@ mod tests {
} }
// run clean-up // run clean-up
cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 2) cleanup_unreferenced_parquet_files(&catalog, 2)
.await .await
.unwrap(); .unwrap();
@ -333,7 +313,7 @@ mod tests {
assert_eq!(leftover.len(), 1); assert_eq!(leftover.len(), 1);
// run clean-up again // run clean-up again
cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 2) cleanup_unreferenced_parquet_files(&catalog, 2)
.await .await
.unwrap(); .unwrap();

View File

@ -1,6 +1,7 @@
//! Contains code to rebuild a catalog from files. //! Contains code to rebuild a catalog from files.
use std::{ use std::{
collections::{hash_map::Entry, HashMap}, collections::{hash_map::Entry, HashMap},
fmt::Debug,
sync::Arc, sync::Arc,
}; };
@ -15,7 +16,7 @@ use snafu::{ResultExt, Snafu};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
catalog::{CatalogState, CheckpointData, PreservedCatalog}, catalog::{CatalogParquetInfo, CatalogState, CheckpointData, PreservedCatalog},
metadata::{IoxMetadata, IoxParquetMetaData}, metadata::{IoxMetadata, IoxParquetMetaData},
}; };
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
@ -55,6 +56,9 @@ pub enum Error {
#[snafu(display("Cannot commit transaction: {}", source))] #[snafu(display("Cannot commit transaction: {}", source))]
CommitFailure { source: crate::catalog::Error }, CommitFailure { source: crate::catalog::Error },
#[snafu(display("Cannot create checkpoint: {}", source))]
CheckpointFailure { source: crate::catalog::Error },
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -98,17 +102,22 @@ pub async fn rebuild_catalog<S>(
ignore_metadata_read_failure: bool, ignore_metadata_read_failure: bool,
) -> Result<(PreservedCatalog, Arc<S>)> ) -> Result<(PreservedCatalog, Arc<S>)>
where where
S: CatalogState + Send + Sync, S: CatalogState + Debug + Send + Sync,
{ {
// collect all revisions from parquet files // collect all revisions from parquet files
let mut revisions = let mut revisions =
collect_revisions(&object_store, search_location, ignore_metadata_read_failure).await?; collect_revisions(&object_store, search_location, ignore_metadata_read_failure).await?;
// create new empty catalog // create new empty catalog
let (catalog, mut state) = let (catalog, state) = PreservedCatalog::new_empty::<S>(
PreservedCatalog::new_empty::<S>(object_store, server_id, db_name, catalog_empty_input) Arc::clone(&object_store),
.await server_id,
.context(NewEmptyFailure)?; db_name,
catalog_empty_input,
)
.await
.context(NewEmptyFailure)?;
let mut state = Arc::try_unwrap(state).expect("dangling Arc?");
// trace all files for final checkpoint // trace all files for final checkpoint
let mut collected_files = HashMap::new(); let mut collected_files = HashMap::new();
@ -124,34 +133,45 @@ where
if let Some((uuid, entries)) = revisions.remove(&revision_counter) { if let Some((uuid, entries)) = revisions.remove(&revision_counter) {
// we have files for this particular transaction // we have files for this particular transaction
let mut transaction = catalog.open_transaction_with_uuid(uuid, state).await; let mut transaction = catalog.open_transaction_with_uuid(uuid).await;
for (path, metadata) in entries { for (path, metadata) in entries {
let path: DirsAndFileName = path.clone().into(); let path: DirsAndFileName = path.clone().into();
state
.add(
Arc::clone(&object_store),
CatalogParquetInfo {
path: path.clone(),
metadata: Arc::new(metadata.clone()),
},
)
.context(FileRecordFailure)?;
transaction transaction
.add_parquet(&path, &metadata) .add_parquet(&path, &metadata)
.context(FileRecordFailure)?; .context(FileRecordFailure)?;
collected_files.insert(path, Arc::new(metadata)); collected_files.insert(path, Arc::new(metadata));
} }
let checkpoint_data = (revision_counter == max_revision).then(|| CheckpointData { let ckpt_handle = transaction.commit().await.context(CommitFailure)?;
files: collected_files.clone(), if revision_counter == max_revision {
}); ckpt_handle
state = transaction .create_checkpoint(CheckpointData {
.commit(checkpoint_data) files: collected_files.clone(),
.await })
.context(CommitFailure)?; .await
.context(CommitFailure)?;
}
} else { } else {
// we do not have any files for this transaction (there might have been other actions though or it was // we do not have any files for this transaction (there might have been other actions though or it was
// an empty transaction) => create new empty transaction // an empty transaction) => create new empty transaction
// Note that this can never be the last transaction, so we don't need to create a checkpoint here. // Note that this can never be the last transaction, so we don't need to create a checkpoint here.
let transaction = catalog.open_transaction(state).await; let transaction = catalog.open_transaction().await;
state = transaction.commit(None).await.context(CommitFailure)?; transaction.commit().await.context(CheckpointFailure)?;
} }
} }
} }
Ok((catalog, state)) Ok((catalog, Arc::new(state)))
} }
/// Collect all files under the given locations. /// Collect all files under the given locations.
@ -278,7 +298,7 @@ mod tests {
let db_name = "db1"; let db_name = "db1";
// build catalog with some data // build catalog with some data
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>( let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -286,8 +306,9 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
let mut state = Arc::try_unwrap(state).unwrap();
{ {
let mut transaction = catalog.open_transaction(state).await; let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -298,6 +319,9 @@ mod tests {
0, 0,
) )
.await; .await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
transaction.add_parquet(&path, &md).unwrap(); transaction.add_parquet(&path, &md).unwrap();
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
@ -309,17 +333,20 @@ mod tests {
1, 1,
) )
.await; .await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
transaction.add_parquet(&path, &md).unwrap(); transaction.add_parquet(&path, &md).unwrap();
state = transaction.commit(None).await.unwrap(); transaction.commit().await.unwrap();
} }
{ {
// empty transaction // empty transaction
let transaction = catalog.open_transaction(state).await; let transaction = catalog.open_transaction().await;
state = transaction.commit(None).await.unwrap(); transaction.commit().await.unwrap();
} }
{ {
let mut transaction = catalog.open_transaction(state).await; let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -330,9 +357,12 @@ mod tests {
2, 2,
) )
.await; .await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
transaction.add_parquet(&path, &md).unwrap(); transaction.add_parquet(&path, &md).unwrap();
state = transaction.commit(None).await.unwrap(); transaction.commit().await.unwrap();
} }
// store catalog state // store catalog state
@ -459,7 +489,7 @@ mod tests {
let db_name = "db1"; let db_name = "db1";
// build catalog with same data // build catalog with same data
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>( let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -468,7 +498,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
{ {
let mut transaction = catalog.open_transaction(state).await; let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -492,7 +522,7 @@ mod tests {
) )
.await; .await;
transaction.commit(None).await.unwrap(); transaction.commit().await.unwrap();
} }
// wipe catalog // wipe catalog
@ -577,7 +607,7 @@ mod tests {
let db_name = "db1"; let db_name = "db1";
// build catalog with some data (2 transactions + initial empty one) // build catalog with some data (2 transactions + initial empty one)
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>( let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -585,8 +615,9 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
let mut state = Arc::try_unwrap(state).unwrap();
{ {
let mut transaction = catalog.open_transaction(state).await; let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -597,12 +628,15 @@ mod tests {
0, 0,
) )
.await; .await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
transaction.add_parquet(&path, &md).unwrap(); transaction.add_parquet(&path, &md).unwrap();
state = transaction.commit(None).await.unwrap(); transaction.commit().await.unwrap();
} }
{ {
let mut transaction = catalog.open_transaction(state).await; let mut transaction = catalog.open_transaction().await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -613,9 +647,12 @@ mod tests {
2, 2,
) )
.await; .await;
state
.parquet_files
.insert(path.clone(), Arc::new(md.clone()));
transaction.add_parquet(&path, &md).unwrap(); transaction.add_parquet(&path, &md).unwrap();
state = transaction.commit(None).await.unwrap(); transaction.commit().await.unwrap();
} }
assert_eq!(catalog.revision_counter(), 2); assert_eq!(catalog.revision_counter(), 2);

View File

@ -12,6 +12,7 @@ use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use async_trait::async_trait; use async_trait::async_trait;
use catalog::{chunk::CatalogChunk, Catalog}; use catalog::{chunk::CatalogChunk, Catalog};
pub(crate) use chunk::DbChunk; pub(crate) use chunk::DbChunk;
use data_types::chunk_metadata::ChunkAddr;
use data_types::{ use data_types::{
chunk_metadata::ChunkSummary, chunk_metadata::ChunkSummary,
database_rules::DatabaseRules, database_rules::DatabaseRules,
@ -30,7 +31,7 @@ use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use object_store::{path::parsed::DirsAndFileName, ObjectStore}; use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{debug, error, info, warn}; use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::RwLock; use parking_lot::RwLock;
use parquet_file::catalog::{CheckpointData, TransactionEnd}; use parquet_file::catalog::CheckpointData;
use parquet_file::{ use parquet_file::{
catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog}, catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog},
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
@ -42,7 +43,7 @@ use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson}; use rand_distr::{Distribution, Poisson};
use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk}; use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::collections::{HashMap, HashSet}; use std::collections::HashMap;
use std::future::Future; use std::future::Future;
use std::{ use std::{
any::Any, any::Any,
@ -143,11 +144,22 @@ pub enum Error {
#[snafu(display("Error building sequenced entry: {}", source))] #[snafu(display("Error building sequenced entry: {}", source))]
SequencedEntryError { source: entry::SequencedEntryError }, SequencedEntryError { source: entry::SequencedEntryError },
#[snafu(display("Error while creating parquet chunk: {}", source))]
ParquetChunkError { source: parquet_file::chunk::Error },
#[snafu(display("Error while handling transaction on preserved catalog: {}", source))] #[snafu(display("Error while handling transaction on preserved catalog: {}", source))]
TransactionError { TransactionError {
source: parquet_file::catalog::Error, source: parquet_file::catalog::Error,
}, },
#[snafu(display("Error while commiting transaction on preserved catalog: {}", source))]
CommitError {
source: parquet_file::catalog::Error,
},
#[snafu(display("Cannot write chunk: {}", addr))]
CannotWriteChunk { addr: ChunkAddr },
#[snafu(display("background task cancelled: {}", source))] #[snafu(display("background task cancelled: {}", source))]
TaskCancelled { source: futures::future::Aborted }, TaskCancelled { source: futures::future::Aborted },
} }
@ -595,9 +607,6 @@ impl Db {
debug!(chunk=%guard.addr(), "chunk marked WRITING , loading tables into object store"); debug!(chunk=%guard.addr(), "chunk marked WRITING , loading tables into object store");
// Drop locks
let chunk = guard.unwrap().chunk;
// Create a storage to save data of this chunk // Create a storage to save data of this chunk
let storage = Storage::new( let storage = Storage::new(
Arc::clone(&db.store), Arc::clone(&db.store),
@ -613,6 +622,10 @@ impl Db {
let preserved_catalog = Arc::clone(&db.preserved_catalog); let preserved_catalog = Arc::clone(&db.preserved_catalog);
let catalog = Arc::clone(&db.catalog); let catalog = Arc::clone(&db.catalog);
let object_store = Arc::clone(&db.store);
// Drop locks
let chunk = guard.unwrap().chunk;
let fut = async move { let fut = async move {
let table_name = table_summary.name.as_str(); let table_name = table_summary.name.as_str();
@ -632,9 +645,22 @@ impl Db {
streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)), streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)),
); );
// check that the upcoming state change will very likely succeed
{
// re-lock
let guard = chunk.read();
if matches!(guard.stage(), &ChunkStage::Persisted { .. })
|| !guard.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting)
{
return Err(Error::CannotWriteChunk {
addr: guard.addr().clone(),
});
}
}
// catalog-level transaction for preservation layer // catalog-level transaction for preservation layer
{ {
let mut transaction = preserved_catalog.open_transaction(catalog).await; let mut transaction = preserved_catalog.open_transaction().await;
// Write this table data into the object store // Write this table data into the object store
// //
@ -659,30 +685,60 @@ impl Db {
) )
.await .await
.context(WritingToObjectStore)?; .context(WritingToObjectStore)?;
let parquet_metadata = Arc::new(parquet_metadata);
let metrics = catalog
.metrics_registry
.register_domain_with_labels("parquet", catalog.metric_labels.clone());
let metrics =
ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet());
let parquet_chunk = Arc::new(
ParquetChunk::new(
path.clone(),
object_store,
Arc::clone(&parquet_metadata),
metrics,
)
.context(ParquetChunkError)?,
);
let path: DirsAndFileName = path.into(); let path: DirsAndFileName = path.into();
transaction transaction
.add_parquet(&path, &parquet_metadata) .add_parquet(&path, &parquet_metadata)
.context(TransactionError)?; .context(TransactionError)?;
// preserved commit
let ckpt_handle = transaction.commit().await.context(CommitError)?;
// in-mem commit
{
let mut guard = chunk.write();
if let Err(e) = guard.set_written_to_object_store(parquet_chunk) {
panic!("Chunk written but cannot mark as written {}", e);
}
}
let create_checkpoint = catalog_transactions_until_checkpoint let create_checkpoint = catalog_transactions_until_checkpoint
.map_or(false, |interval| { .map_or(false, |interval| {
transaction.revision_counter() % interval.get() == 0 ckpt_handle.revision_counter() % interval.get() == 0
}); });
let checkpoint_data = create_checkpoint.then(|| { if create_checkpoint {
let mut checkpoint_data = // Commit is already done, so we can just scan the catalog for the state.
checkpoint_data_from_catalog(&transaction.tstate().catalog); //
// don't forget the file that we've just added // NOTE: There can only be a single transaction in this section because the checkpoint handle holds
checkpoint_data // transaction lock. Therefore we don't need to worry about concurrent modifications of
.files // preserved chunks.
.insert(path, Arc::new(parquet_metadata)); if let Err(e) = ckpt_handle
checkpoint_data .create_checkpoint(checkpoint_data_from_catalog(&catalog))
}); .await
{
warn!(%e, "cannot create catalog checkpoint");
transaction // That's somewhat OK. Don't fail the entire task, because the actual preservation was completed
.commit(checkpoint_data) // (both in-mem and within the preserved catalog).
.await }
.context(TransactionError)?; }
} }
// We know this chunk is ParquetFile type // We know this chunk is ParquetFile type
@ -804,7 +860,7 @@ impl Db {
debug!(?duration, "cleanup worker sleeps"); debug!(?duration, "cleanup worker sleeps");
tokio::time::sleep(duration).await; tokio::time::sleep(duration).await;
if let Err(e) = cleanup_unreferenced_parquet_files(&self.preserved_catalog, Arc::clone(&self.catalog), 1_000).await { if let Err(e) = cleanup_unreferenced_parquet_files(&self.preserved_catalog, 1_000).await {
error!(%e, "error in background cleanup task"); error!(%e, "error in background cleanup task");
} }
} => {}, } => {},
@ -1016,47 +1072,6 @@ pub struct CatalogEmptyInput {
metric_labels: Vec<KeyValue>, metric_labels: Vec<KeyValue>,
} }
#[derive(Debug)]
enum TransactionCommitAction {
DropChunk {
table_name: String,
partition_key: String,
chunk_id: u32,
},
NewChunk {
table_name: String,
partition_key: String,
chunk_id: u32,
inner: Arc<ParquetChunk>,
},
SetWritten {
table_name: String,
partition_key: String,
chunk_id: u32,
inner: Arc<ParquetChunk>,
},
}
/// Helper to manage transaction on the in-memory catalog.
#[derive(Debug)]
pub struct TransactionState {
/// Inner catalog used during this transaction.
catalog: Arc<Catalog>,
/// Actions that will be performed on successful commit. These are pre-checked and should not result in any errors.
commit_actions: Vec<TransactionCommitAction>,
/// New files that are to be added during this transaction with table, partition key and chunk ID.
///
/// This only contains files that were not (yet) removed during the same transaction.
new_files: HashMap<DirsAndFileName, (String, String, u32)>,
/// Files removed during this transaction.
///
/// This only contains files that were not (yet) re-added during the same transaction.
removed_files: HashSet<DirsAndFileName>,
}
impl CatalogState for Catalog { impl CatalogState for Catalog {
type EmptyInput = CatalogEmptyInput; type EmptyInput = CatalogEmptyInput;
@ -1069,89 +1084,8 @@ impl CatalogState for Catalog {
) )
} }
type TransactionState = TransactionState;
fn transaction_begin(origin: &Arc<Self>) -> Self::TransactionState {
TransactionState {
catalog: Arc::clone(origin),
commit_actions: vec![],
new_files: HashMap::new(),
removed_files: HashSet::new(),
}
}
fn transaction_end(tstate: Self::TransactionState, how: TransactionEnd) -> Arc<Self> {
let TransactionState {
catalog,
commit_actions,
..
} = tstate;
if matches!(how, TransactionEnd::Commit) {
for action in commit_actions {
match action {
TransactionCommitAction::DropChunk {
table_name,
partition_key,
chunk_id,
} => {
// TODO: Should this really be infallible?
if let Ok(partition) = catalog.partition(&table_name, &partition_key) {
let mut partition = partition.write();
let _ = partition.drop_chunk(chunk_id);
}
debug!(%table_name, %partition_key, chunk_id, "removed chunk according to persisted catalog");
}
TransactionCommitAction::NewChunk {
table_name,
partition_key,
chunk_id,
inner,
} => {
let partition = catalog
.get_or_create_partition(table_name.clone(), partition_key.clone());
let mut partition = partition.write();
partition.insert_object_store_only_chunk(chunk_id, inner);
debug!(%table_name, %partition_key, chunk_id, "recovered chunk from persisted catalog");
}
TransactionCommitAction::SetWritten {
table_name,
partition_key,
chunk_id,
inner,
} => {
let partition = catalog
.get_or_create_partition(table_name.clone(), partition_key.clone());
let partition = partition.read();
match partition.chunk(chunk_id) {
Some(chunk) => {
let mut chunk = chunk.write();
match chunk.set_written_to_object_store(inner) {
Ok(()) => {
debug!(%table_name, %partition_key, chunk_id, "chunk marked WRITTEN. Persisting to object store complete");
}
Err(e) => {
warn!(%e, %table_name, %partition_key, chunk_id, "chunk state changed during transaction even though lifecycle action was present");
}
}
}
None => {
warn!(%table_name, %partition_key, chunk_id, "chunk is gone during transaction even though lifecycle action was present");
}
}
}
};
}
}
catalog
}
fn add( fn add(
tstate: &mut Self::TransactionState, &mut self,
object_store: Arc<ObjectStore>, object_store: Arc<ObjectStore>,
info: CatalogParquetInfo, info: CatalogParquetInfo,
) -> parquet_file::catalog::Result<()> { ) -> parquet_file::catalog::Result<()> {
@ -1166,13 +1100,11 @@ impl CatalogState for Catalog {
})?; })?;
// Create a parquet chunk for this chunk // Create a parquet chunk for this chunk
let metrics = tstate let metrics = self
.catalog
.metrics_registry .metrics_registry
.register_domain_with_labels("parquet", tstate.catalog.metric_labels.clone()); .register_domain_with_labels("parquet", self.metric_labels.clone());
let metrics = let metrics = ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet());
ParquetChunkMetrics::new(&metrics, tstate.catalog.metrics().memory().parquet());
let parquet_chunk = ParquetChunk::new( let parquet_chunk = ParquetChunk::new(
object_store.path_from_dirs_and_filename(info.path.clone()), object_store.path_from_dirs_and_filename(info.path.clone()),
object_store, object_store,
@ -1186,128 +1118,45 @@ impl CatalogState for Catalog {
// Get partition from the catalog // Get partition from the catalog
// Note that the partition might not exist yet if the chunk is loaded from an existing preserved catalog. // Note that the partition might not exist yet if the chunk is loaded from an existing preserved catalog.
let partition = tstate let partition = self.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key);
.catalog let mut partition = partition.write();
.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key); if partition.chunk(iox_md.chunk_id).is_some() {
let partition_guard = partition.read();
if tstate.new_files.contains_key(&info.path) {
return Err(parquet_file::catalog::Error::ParquetFileAlreadyExists { path: info.path }); return Err(parquet_file::catalog::Error::ParquetFileAlreadyExists { path: info.path });
} }
partition.insert_object_store_only_chunk(iox_md.chunk_id, parquet_chunk);
// Get the chunk from the catalog
match (
tstate.removed_files.remove(&info.path),
partition_guard.chunk(iox_md.chunk_id),
) {
(false, Some(chunk)) => {
// Chunk exists => should be in frozen stage and will transition from there
// Relock the chunk again (nothing else should have been able
// to modify the chunk state while we were moving it
let chunk = chunk.read();
// check if chunk already exists
if matches!(chunk.stage(), &ChunkStage::Persisted { .. }) {
return Err(parquet_file::catalog::Error::ParquetFileAlreadyExists {
path: info.path,
});
}
// check that the upcoming state change will very likely succeed
if !chunk.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting) {
return Err(parquet_file::catalog::Error::CatalogStateFailure {
source: Box::new(
crate::db::catalog::chunk::Error::UnexpectedLifecycleAction {
chunk: chunk.addr().clone(),
expected: "persisting".to_string(),
actual: chunk
.lifecycle_action()
.map_or("n/a", |action| action.metadata().name())
.to_string(),
},
),
path: info.path,
});
}
// update the catalog to say we are done processing
tstate
.commit_actions
.push(TransactionCommitAction::SetWritten {
table_name: iox_md.table_name.clone(),
partition_key: iox_md.partition_key.clone(),
chunk_id: iox_md.chunk_id,
inner: parquet_chunk,
});
tstate.new_files.insert(
info.path,
(iox_md.table_name, iox_md.partition_key, iox_md.chunk_id),
);
}
_ => {
// table unknown => that's ok, create chunk in "object store only" stage which will also create the table
// table chunk, but table already known => that's ok, create chunk in "object store only" stage
tstate
.commit_actions
.push(TransactionCommitAction::NewChunk {
table_name: iox_md.table_name.clone(),
partition_key: iox_md.partition_key.clone(),
chunk_id: iox_md.chunk_id,
inner: parquet_chunk,
});
tstate.new_files.insert(
info.path,
(iox_md.table_name, iox_md.partition_key, iox_md.chunk_id),
);
}
}
Ok(()) Ok(())
} }
fn remove( fn remove(&mut self, path: DirsAndFileName) -> parquet_file::catalog::Result<()> {
tstate: &mut Self::TransactionState, let mut removed_any = false;
path: DirsAndFileName,
) -> parquet_file::catalog::Result<()> {
if tstate.removed_files.contains(&path) {
return Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path });
}
let mut actions: Vec<TransactionCommitAction> = vec![]; for partition in self.partitions() {
let mut partition = partition.write();
for partition in tstate.catalog.partitions() { let mut to_remove = vec![];
let partition = partition.read();
for chunk in partition.chunks() { for chunk in partition.chunks() {
let chunk = chunk.read(); let chunk = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
let chunk_path: DirsAndFileName = parquet.path().into(); let chunk_path: DirsAndFileName = parquet.path().into();
if path == chunk_path { if path == chunk_path {
actions.push(TransactionCommitAction::DropChunk { to_remove.push(chunk.id());
table_name: partition.table_name().to_string(),
partition_key: partition.key().to_string(),
chunk_id: chunk.id(),
});
} }
} }
} }
for chunk_id in to_remove {
if let Err(e) = partition.drop_chunk(chunk_id) {
panic!("Chunk is gone while we've had a partition lock: {}", e);
}
removed_any = true;
}
} }
if let Some((table_name, partition_key, chunk_id)) = tstate.new_files.remove(&path) { if removed_any {
actions.push(TransactionCommitAction::DropChunk {
table_name,
partition_key,
chunk_id,
});
}
if actions.is_empty() {
Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path })
} else {
tstate.commit_actions.append(&mut actions);
tstate.removed_files.insert(path);
Ok(()) Ok(())
} else {
Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path })
} }
} }
} }