refactor: move lifecycle implementations out of db.rs and into their own modules (#1858)

* refactor: move lifecycle implementations out of db.rs and into their own modules

* fix: clippy
pull/24376/head
Andrew Lamb 2021-06-30 13:24:04 -04:00 committed by GitHub
parent 9e1723620c
commit 817a480cde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 391 additions and 338 deletions

View File

@ -15,44 +15,33 @@ use crate::{
write_buffer::WriteBuffer, write_buffer::WriteBuffer,
JobRegistry, JobRegistry,
}; };
use ::lifecycle::{LifecycleWriteGuard, LockableChunk}; use ::lifecycle::LockableChunk;
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Utc; use chrono::Utc;
use data_types::{ use data_types::{
chunk_metadata::{ChunkAddr, ChunkSummary}, chunk_metadata::ChunkSummary,
database_rules::DatabaseRules, database_rules::DatabaseRules,
job::Job,
partition_metadata::{PartitionSummary, TableSummary}, partition_metadata::{PartitionSummary, TableSummary},
server_id::ServerId, server_id::ServerId,
}; };
use datafusion::{ use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
physical_plan::SendableRecordBatchStream,
};
use entry::{Entry, SequencedEntry}; use entry::{Entry, SequencedEntry};
use internal_types::{arrow::sort::sort_record_batch, selection::Selection};
use metrics::KeyValue; use metrics::KeyValue;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use mutable_buffer::persistence_windows::PersistenceWindows; use mutable_buffer::persistence_windows::PersistenceWindows;
use object_store::{path::parsed::DirsAndFileName, ObjectStore}; use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{debug, error, info, warn}; use observability_deps::tracing::{debug, error, info};
use parking_lot::RwLock; use parking_lot::RwLock;
use parquet_file::{ use parquet_file::{
catalog::{CheckpointData, PreservedCatalog}, catalog::{CheckpointData, PreservedCatalog},
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files}, cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
metadata::IoxMetadata,
storage::Storage,
}; };
use query::{exec::Executor, predicate::Predicate, QueryDatabase}; use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson}; use rand_distr::{Distribution, Poisson};
use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
use snafu::{ensure, ResultExt, Snafu}; use snafu::{ensure, ResultExt, Snafu};
use std::{ use std::{
any::Any, any::Any,
collections::HashMap, collections::HashMap,
future::Future,
num::NonZeroUsize, num::NonZeroUsize,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@ -60,7 +49,6 @@ use std::{
}, },
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
pub mod access; pub mod access;
pub mod catalog; pub mod catalog;
@ -72,6 +60,7 @@ mod process_clock;
mod streams; mod streams;
mod system_tables; mod system_tables;
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(context(false))] #[snafu(context(false))]
@ -81,48 +70,10 @@ pub enum Error {
PartitionError { source: catalog::partition::Error }, PartitionError { source: catalog::partition::Error },
#[snafu(display("Lifecycle error: {}", source))] #[snafu(display("Lifecycle error: {}", source))]
LifecycleError { source: catalog::chunk::Error }, LifecycleError { source: lifecycle::error::Error },
#[snafu(display( #[snafu(display("Error freeinzing chunk while rolling over partition: {}", source))]
"Can not drop chunk {}:{}:{} which has an in-progress lifecycle action {}. Wait for this to complete", FreezingChunk { source: catalog::chunk::Error },
partition_key,
table_name,
chunk_id,
action
))]
DropMovingChunk {
partition_key: String,
table_name: String,
chunk_id: u32,
action: String,
},
#[snafu(display("Read Buffer Error in chunk {}{} : {}", chunk_id, table_name, source))]
ReadBufferChunkError {
source: read_buffer::Error,
table_name: String,
chunk_id: u32,
},
#[snafu(display(
"Read Buffer Schema Error in chunk {}:{} : {}",
chunk_id,
table_name,
source
))]
ReadBufferChunkSchemaError {
source: read_buffer::Error,
table_name: String,
chunk_id: u32,
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore {
source: parquet_file::storage::Error,
},
#[snafu(display("Unknown Mutable Buffer Chunk {}", chunk_id))]
UnknownMutableBufferChunk { chunk_id: u32 },
#[snafu(display("Error sending entry to write buffer"))] #[snafu(display("Error sending entry to write buffer"))]
WriteBufferError { WriteBufferError {
@ -157,22 +108,6 @@ pub enum Error {
#[snafu(display("Error building sequenced entry: {}", source))] #[snafu(display("Error building sequenced entry: {}", source))]
SequencedEntryError { source: entry::SequencedEntryError }, SequencedEntryError { source: entry::SequencedEntryError },
#[snafu(display("Error while creating parquet chunk: {}", source))]
ParquetChunkError { source: parquet_file::chunk::Error },
#[snafu(display("Error while handling transaction on preserved catalog: {}", source))]
TransactionError {
source: parquet_file::catalog::Error,
},
#[snafu(display("Error while commiting transaction on preserved catalog: {}", source))]
CommitError {
source: parquet_file::catalog::Error,
},
#[snafu(display("Cannot write chunk: {}", addr))]
CannotWriteChunk { addr: ChunkAddr },
#[snafu(display("background task cancelled: {}", source))] #[snafu(display("background task cancelled: {}", source))]
TaskCancelled { source: futures::future::Aborted }, TaskCancelled { source: futures::future::Aborted },
@ -369,7 +304,7 @@ impl Db {
if let Some(chunk) = chunk { if let Some(chunk) = chunk {
let mut chunk = chunk.write(); let mut chunk = chunk.write();
chunk.freeze().context(LifecycleError)?; chunk.freeze().context(FreezingChunk)?;
Ok(Some(DbChunk::snapshot(&chunk))) Ok(Some(DbChunk::snapshot(&chunk)))
} else { } else {
@ -430,79 +365,9 @@ impl Db {
chunk_id: u32, chunk_id: u32,
) -> Result<Arc<DbChunk>> { ) -> Result<Arc<DbChunk>> {
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
let (_, fut) = Self::move_chunk_to_read_buffer_impl(chunk.write())?; let (_, fut) = lifecycle::move_chunk::move_chunk_to_read_buffer_impl(chunk.write())
fut.await.context(TaskCancelled)? .context(LifecycleError)?;
} fut.await.context(TaskCancelled)?.context(LifecycleError)
/// The implementation for moving a chunk to the read buffer
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
fn move_chunk_to_read_buffer_impl(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
let (tracker, registration) = db.jobs.register(Job::CloseChunk {
db_name: addr.db_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: addr.table_name.to_string(),
chunk_id: addr.chunk_id,
});
// update the catalog to say we are processing this chunk and
// then drop the lock while we do the work
let (mb_chunk, table_summary) = {
let mb_chunk = guard.set_moving(&registration).context(LifecycleError)?;
(mb_chunk, guard.table_summary())
};
// Drop locks
let chunk = guard.unwrap().chunk;
// create a new read buffer chunk with memory tracking
let metrics = db
.metrics_registry
.register_domain_with_labels("read_buffer", db.metric_labels.clone());
let mut rb_chunk = RBChunk::new(
&table_summary.name,
ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
);
let fut = async move {
info!(chunk=%addr, "chunk marked MOVING, loading tables into read buffer");
// load table into the new chunk one by one.
debug!(chunk=%addr, "loading table to read buffer");
let batch = mb_chunk
.read_filter(Selection::All)
// It is probably reasonable to recover from this error
// (reset the chunk state to Open) but until that is
// implemented (and tested) just panic
.expect("Loading chunk to mutable buffer");
let sorted = sort_record_batch(batch).expect("failed to sort");
rb_chunk.upsert_table(&table_summary.name, sorted);
// Can drop and re-acquire as lifecycle action prevents concurrent modification
let mut guard = chunk.write();
// update the catalog to say we are done processing
guard
.set_moved(Arc::new(rb_chunk))
.expect("failed to move chunk");
debug!(chunk=%addr, "chunk marked MOVED. loading complete");
Ok(DbChunk::snapshot(&guard))
};
Ok((tracker, fut.track(registration)))
} }
/// Write given table of a given chunk to object store. /// Write given table of a given chunk to object store.
@ -514,170 +379,9 @@ impl Db {
chunk_id: u32, chunk_id: u32,
) -> Result<Arc<DbChunk>> { ) -> Result<Arc<DbChunk>> {
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
let (_, fut) = Self::write_chunk_to_object_store_impl(chunk.write())?; let (_, fut) = lifecycle::write::write_chunk_to_object_store_impl(chunk.write())
fut.await.context(TaskCancelled)?
}
/// The implementation for writing a chunk to the object store
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
fn write_chunk_to_object_store_impl(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
let (tracker, registration) = db.jobs.register(Job::WriteChunk {
db_name: addr.db_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: addr.table_name.to_string(),
chunk_id: addr.chunk_id,
});
// update the catalog to say we are processing this chunk and
let rb_chunk = guard
.set_writing_to_object_store(&registration)
.context(LifecycleError)?; .context(LifecycleError)?;
fut.await.context(TaskCancelled)?.context(LifecycleError)
debug!(chunk=%guard.addr(), "chunk marked WRITING , loading tables into object store");
// Create a storage to save data of this chunk
let storage = Storage::new(Arc::clone(&db.store), db.server_id);
let catalog_transactions_until_checkpoint = db
.rules
.read()
.lifecycle_rules
.catalog_transactions_until_checkpoint
.get();
let preserved_catalog = Arc::clone(&db.preserved_catalog);
let catalog = Arc::clone(&db.catalog);
let object_store = Arc::clone(&db.store);
let cleanup_lock = Arc::clone(&db.cleanup_lock);
// Drop locks
let chunk = guard.unwrap().chunk;
let fut = async move {
debug!(chunk=%addr, "loading table to object store");
let predicate = read_buffer::Predicate::default();
// Get RecordBatchStream of data from the read buffer chunk
let read_results = rb_chunk.read_filter(&addr.table_name, predicate, Selection::All);
let arrow_schema: ArrowSchemaRef = rb_chunk
.read_filter_table_schema(Selection::All)
.expect("read buffer is infallible")
.into();
let stream: SendableRecordBatchStream = Box::pin(
streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)),
);
// check that the upcoming state change will very likely succeed
{
// re-lock
let guard = chunk.read();
if matches!(guard.stage(), &ChunkStage::Persisted { .. })
|| !guard.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting)
{
return Err(Error::CannotWriteChunk {
addr: guard.addr().clone(),
});
}
}
// catalog-level transaction for preservation layer
{
// fetch shared (= read) guard preventing the cleanup job from deleting our files
let _guard = cleanup_lock.read().await;
// Write this table data into the object store
//
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
// between creation and the transaction commit.
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: addr.table_name.to_string(),
partition_key: addr.partition_key.to_string(),
chunk_id: addr.chunk_id,
};
let (path, parquet_metadata) = storage
.write_to_object_store(addr, stream, metadata)
.await
.context(WritingToObjectStore)?;
let parquet_metadata = Arc::new(parquet_metadata);
let metrics = catalog
.metrics_registry
.register_domain_with_labels("parquet", catalog.metric_labels.clone());
let metrics =
ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet());
let parquet_chunk = Arc::new(
ParquetChunk::new(
path.clone(),
object_store,
Arc::clone(&parquet_metadata),
metrics,
)
.context(ParquetChunkError)?,
);
let path: DirsAndFileName = path.into();
// IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold the
// transaction lock (that is part of the PreservedCatalog) for too long. By using the
// cleanup lock (see above) it is ensured that the file that we have written is not deleted
// in between.
let mut transaction = preserved_catalog.open_transaction().await;
transaction
.add_parquet(&path, &parquet_metadata)
.context(TransactionError)?;
// preserved commit
let ckpt_handle = transaction.commit().await.context(CommitError)?;
// in-mem commit
{
let mut guard = chunk.write();
if let Err(e) = guard.set_written_to_object_store(parquet_chunk) {
panic!("Chunk written but cannot mark as written {}", e);
}
}
let create_checkpoint =
ckpt_handle.revision_counter() % catalog_transactions_until_checkpoint == 0;
if create_checkpoint {
// Commit is already done, so we can just scan the catalog for the state.
//
// NOTE: There can only be a single transaction in this section because the checkpoint handle holds
// transaction lock. Therefore we don't need to worry about concurrent modifications of
// preserved chunks.
if let Err(e) = ckpt_handle
.create_checkpoint(checkpoint_data_from_catalog(&catalog))
.await
{
warn!(%e, "cannot create catalog checkpoint");
// That's somewhat OK. Don't fail the entire task, because the actual preservation was completed
// (both in-mem and within the preserved catalog).
}
}
}
// We know this chunk is ParquetFile type
let chunk = chunk.read();
Ok(DbChunk::parquet_file_snapshot(&chunk))
};
Ok((tracker, fut.track(registration)))
} }
/// Unload chunk from read buffer but keep it in object store /// Unload chunk from read buffer but keep it in object store
@ -689,21 +393,7 @@ impl Db {
) -> Result<Arc<DbChunk>> { ) -> Result<Arc<DbChunk>> {
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
let chunk = chunk.write(); let chunk = chunk.write();
Self::unload_read_buffer_impl(chunk) lifecycle::unload::unload_read_buffer_impl(chunk).context(LifecycleError)
}
pub fn unload_read_buffer_impl(
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<Arc<DbChunk>> {
debug!(chunk=%chunk.addr(), "unloading chunk from read buffer");
chunk
.set_unload_from_read_buffer()
.context(LifecycleError {})?;
debug!(chunk=%chunk.addr(), "chunk marked UNLOADED from read buffer");
Ok(DbChunk::snapshot(&chunk))
} }
/// Return chunk summary information for all chunks in the specified /// Return chunk summary information for all chunks in the specified
@ -770,7 +460,7 @@ impl Db {
self.worker_iterations_lifecycle self.worker_iterations_lifecycle
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
tokio::select! { tokio::select! {
_ = policy.check_for_work(chrono::Utc::now(), std::time::Instant::now()) => {}, _ = policy.check_for_work(Utc::now(), std::time::Instant::now()) => {},
_ = shutdown.cancelled() => break, _ = shutdown.cancelled() => break,
} }
} }
@ -1051,7 +741,7 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData
for chunk in catalog.chunks() { for chunk in catalog.chunks() {
let guard = chunk.read(); let guard = chunk.read();
if let catalog::chunk::ChunkStage::Persisted { parquet, .. } = guard.stage() { if let ChunkStage::Persisted { parquet, .. } = guard.stage() {
let path: DirsAndFileName = parquet.path().into(); let path: DirsAndFileName = parquet.path().into();
files.insert(path, parquet.parquet_metadata()); files.insert(path, parquet.parquet_metadata());
} }
@ -1107,8 +797,8 @@ mod tests {
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use bytes::Bytes; use bytes::Bytes;
use chrono::Utc;
use futures::{stream, StreamExt, TryStreamExt}; use futures::{stream, StreamExt, TryStreamExt};
use internal_types::selection::Selection;
use mutable_buffer::persistence_windows::MinMaxSequence; use mutable_buffer::persistence_windows::MinMaxSequence;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -1133,15 +823,15 @@ mod tests {
use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
use crate::{ use crate::{
db::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}, db::{
catalog::chunk::{ChunkStage, ChunkStageFrozenRepr},
test_helpers::{try_write_lp, write_lp},
},
utils::{make_db, TestDb}, utils::{make_db, TestDb},
write_buffer::test_helpers::MockBuffer, write_buffer::test_helpers::MockBuffer,
}; };
use super::{ use super::*;
test_helpers::{try_write_lp, write_lp},
*,
};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>; type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>; type Result<T, E = Error> = std::result::Result<T, E>;

View File

@ -19,7 +19,12 @@ use crate::db::catalog::partition::Partition;
use crate::Db; use crate::Db;
mod compact; mod compact;
pub mod error;
pub(crate) mod move_chunk;
pub(crate) mod unload;
pub(crate) mod write;
use error::{Error, Result};
/// ///
/// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db` /// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db`
/// ///
@ -38,8 +43,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
type Job = Job; type Job = Job;
// TODO: Separate error enumeration for lifecycle actions - db::Error is large type Error = Error;
type Error = super::Error;
fn read(&self) -> LifecycleReadGuard<'_, Self::Chunk, Self> { fn read(&self) -> LifecycleReadGuard<'_, Self::Chunk, Self> {
LifecycleReadGuard::new(self.clone(), self.chunk.as_ref()) LifecycleReadGuard::new(self.clone(), self.chunk.as_ref())
@ -53,7 +57,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
s: LifecycleWriteGuard<'_, Self::Chunk, Self>, s: LifecycleWriteGuard<'_, Self::Chunk, Self>,
) -> Result<TaskTracker<Self::Job>, Self::Error> { ) -> Result<TaskTracker<Self::Job>, Self::Error> {
info!(chunk=%s.addr(), "move to read buffer"); info!(chunk=%s.addr(), "move to read buffer");
let (tracker, fut) = Db::move_chunk_to_read_buffer_impl(s)?; let (tracker, fut) = move_chunk::move_chunk_to_read_buffer_impl(s)?;
let _ = tokio::spawn(async move { fut.await.log_if_error("move to read buffer") }); let _ = tokio::spawn(async move { fut.await.log_if_error("move to read buffer") });
Ok(tracker) Ok(tracker)
} }
@ -62,7 +66,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
s: LifecycleWriteGuard<'_, Self::Chunk, Self>, s: LifecycleWriteGuard<'_, Self::Chunk, Self>,
) -> Result<TaskTracker<Self::Job>, Self::Error> { ) -> Result<TaskTracker<Self::Job>, Self::Error> {
info!(chunk=%s.addr(), "writing to object store"); info!(chunk=%s.addr(), "writing to object store");
let (tracker, fut) = Db::write_chunk_to_object_store_impl(s)?; let (tracker, fut) = write::write_chunk_to_object_store_impl(s)?;
let _ = tokio::spawn(async move { fut.await.log_if_error("writing to object store") }); let _ = tokio::spawn(async move { fut.await.log_if_error("writing to object store") });
Ok(tracker) Ok(tracker)
} }
@ -72,7 +76,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
info!(chunk=%s.addr(), "unloading from readbuffer"); info!(chunk=%s.addr(), "unloading from readbuffer");
let _ = Db::unload_read_buffer_impl(s)?; let _ = self::unload::unload_read_buffer_impl(s)?;
Ok(()) Ok(())
} }
} }

View File

@ -0,0 +1,53 @@
//! Errors that can occur during lifecycle actions
use data_types::chunk_metadata::ChunkAddr;
use snafu::Snafu;
#[derive(Debug, Snafu)]
// Export the snafu "selectors" so they can be used in other modules
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(context(false))]
PartitionError {
source: crate::db::catalog::partition::Error,
},
#[snafu(display("Lifecycle error: {}", source))]
LifecycleError {
source: crate::db::catalog::chunk::Error,
},
#[snafu(display("Read Buffer Error in chunk {}{} : {}", chunk_id, table_name, source))]
ReadBufferChunkError {
source: read_buffer::Error,
table_name: String,
chunk_id: u32,
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore {
source: parquet_file::storage::Error,
},
#[snafu(display("Error sending entry to write buffer"))]
WriteBufferError {
source: Box<dyn std::error::Error + Sync + Send>,
},
#[snafu(display("Error while creating parquet chunk: {}", source))]
ParquetChunkError { source: parquet_file::chunk::Error },
#[snafu(display("Error while handling transaction on preserved catalog: {}", source))]
TransactionError {
source: parquet_file::catalog::Error,
},
#[snafu(display("Error while commiting transaction on preserved catalog: {}", source))]
CommitError {
source: parquet_file::catalog::Error,
},
#[snafu(display("Cannot write chunk: {}", addr))]
CannotWriteChunk { addr: ChunkAddr },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

View File

@ -0,0 +1,87 @@
use crate::db::catalog::chunk::CatalogChunk;
pub(crate) use crate::db::chunk::DbChunk;
use ::lifecycle::LifecycleWriteGuard;
use data_types::job::Job;
use internal_types::{arrow::sort::sort_record_batch, selection::Selection};
use observability_deps::tracing::{debug, info};
use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
use snafu::ResultExt;
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use super::{
error::{LifecycleError, Result},
LockableCatalogChunk,
};
/// The implementation for moving a chunk to the read buffer
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
pub fn move_chunk_to_read_buffer_impl(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
let (tracker, registration) = db.jobs.register(Job::CloseChunk {
db_name: addr.db_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: addr.table_name.to_string(),
chunk_id: addr.chunk_id,
});
// update the catalog to say we are processing this chunk and
// then drop the lock while we do the work
let (mb_chunk, table_summary) = {
let mb_chunk = guard.set_moving(&registration).context(LifecycleError)?;
(mb_chunk, guard.table_summary())
};
// Drop locks
let chunk = guard.unwrap().chunk;
// create a new read buffer chunk with memory tracking
let metrics = db
.metrics_registry
.register_domain_with_labels("read_buffer", db.metric_labels.clone());
let mut rb_chunk = RBChunk::new(
&table_summary.name,
ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
);
let fut = async move {
info!(chunk=%addr, "chunk marked MOVING, loading tables into read buffer");
// load table into the new chunk one by one.
debug!(chunk=%addr, "loading table to read buffer");
let batch = mb_chunk
.read_filter(Selection::All)
// It is probably reasonable to recover from this error
// (reset the chunk state to Open) but until that is
// implemented (and tested) just panic
.expect("Loading chunk to mutable buffer");
let sorted = sort_record_batch(batch).expect("failed to sort");
rb_chunk.upsert_table(&table_summary.name, sorted);
// Can drop and re-acquire as lifecycle action prevents concurrent modification
let mut guard = chunk.write();
// update the catalog to say we are done processing
guard
.set_moved(Arc::new(rb_chunk))
.expect("failed to move chunk");
debug!(chunk=%addr, "chunk marked MOVED. loading complete");
Ok(DbChunk::snapshot(&guard))
};
Ok((tracker, fut.track(registration)))
}

View File

@ -0,0 +1,27 @@
//! This module contains the code to unload chunks from the read buffer
use snafu::ResultExt;
use std::sync::Arc;
use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::debug;
use crate::db::{catalog::chunk::CatalogChunk, DbChunk};
use super::LockableCatalogChunk;
use super::error::{LifecycleError, Result};
pub fn unload_read_buffer_impl(
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<Arc<DbChunk>> {
debug!(chunk=%chunk.addr(), "unloading chunk from read buffer");
chunk
.set_unload_from_read_buffer()
.context(LifecycleError {})?;
debug!(chunk=%chunk.addr(), "chunk marked UNLOADED from read buffer");
Ok(DbChunk::snapshot(&chunk))
}

View File

@ -0,0 +1,192 @@
//! This module contains the code to write chunks to the object store
use crate::db::{
catalog::chunk::{CatalogChunk, ChunkStage},
checkpoint_data_from_catalog,
lifecycle::LockableCatalogChunk,
streams, DbChunk,
};
use ::lifecycle::LifecycleWriteGuard;
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use chrono::Utc;
use data_types::job::Job;
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::selection::Selection;
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::{debug, warn};
use parquet_file::{
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
metadata::IoxMetadata,
storage::Storage,
};
use snafu::ResultExt;
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use super::error::{
CommitError, Error, LifecycleError, ParquetChunkError, Result, TransactionError,
WritingToObjectStore,
};
/// The implementation for writing a chunk to the object store
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
pub fn write_chunk_to_object_store_impl(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
let (tracker, registration) = db.jobs.register(Job::WriteChunk {
db_name: addr.db_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: addr.table_name.to_string(),
chunk_id: addr.chunk_id,
});
// update the catalog to say we are processing this chunk and
let rb_chunk = guard
.set_writing_to_object_store(&registration)
.context(LifecycleError)?;
debug!(chunk=%guard.addr(), "chunk marked WRITING , loading tables into object store");
// Create a storage to save data of this chunk
let storage = Storage::new(Arc::clone(&db.store), db.server_id);
let catalog_transactions_until_checkpoint = db
.rules
.read()
.lifecycle_rules
.catalog_transactions_until_checkpoint
.get();
let preserved_catalog = Arc::clone(&db.preserved_catalog);
let catalog = Arc::clone(&db.catalog);
let object_store = Arc::clone(&db.store);
let cleanup_lock = Arc::clone(&db.cleanup_lock);
// Drop locks
let chunk = guard.unwrap().chunk;
let fut = async move {
debug!(chunk=%addr, "loading table to object store");
let predicate = read_buffer::Predicate::default();
// Get RecordBatchStream of data from the read buffer chunk
let read_results = rb_chunk.read_filter(&addr.table_name, predicate, Selection::All);
let arrow_schema: ArrowSchemaRef = rb_chunk
.read_filter_table_schema(Selection::All)
.expect("read buffer is infallible")
.into();
let stream: SendableRecordBatchStream = Box::pin(streams::ReadFilterResultsStream::new(
read_results,
Arc::clone(&arrow_schema),
));
// check that the upcoming state change will very likely succeed
{
// re-lock
let guard = chunk.read();
if matches!(guard.stage(), &ChunkStage::Persisted { .. })
|| !guard.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting)
{
return Err(Error::CannotWriteChunk {
addr: guard.addr().clone(),
});
}
}
// catalog-level transaction for preservation layer
{
// fetch shared (= read) guard preventing the cleanup job from deleting our files
let _guard = cleanup_lock.read().await;
// Write this table data into the object store
//
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
// between creation and the transaction commit.
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: addr.table_name.to_string(),
partition_key: addr.partition_key.to_string(),
chunk_id: addr.chunk_id,
};
let (path, parquet_metadata) = storage
.write_to_object_store(addr, stream, metadata)
.await
.context(WritingToObjectStore)?;
let parquet_metadata = Arc::new(parquet_metadata);
let metrics = catalog
.metrics_registry
.register_domain_with_labels("parquet", catalog.metric_labels.clone());
let metrics = ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet());
let parquet_chunk = Arc::new(
ParquetChunk::new(
path.clone(),
object_store,
Arc::clone(&parquet_metadata),
metrics,
)
.context(ParquetChunkError)?,
);
let path: DirsAndFileName = path.into();
// IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold the
// transaction lock (that is part of the PreservedCatalog) for too long. By using the
// cleanup lock (see above) it is ensured that the file that we have written is not deleted
// in between.
let mut transaction = preserved_catalog.open_transaction().await;
transaction
.add_parquet(&path, &parquet_metadata)
.context(TransactionError)?;
// preserved commit
let ckpt_handle = transaction.commit().await.context(CommitError)?;
// in-mem commit
{
let mut guard = chunk.write();
if let Err(e) = guard.set_written_to_object_store(parquet_chunk) {
panic!("Chunk written but cannot mark as written {}", e);
}
}
let create_checkpoint =
ckpt_handle.revision_counter() % catalog_transactions_until_checkpoint == 0;
if create_checkpoint {
// Commit is already done, so we can just scan the catalog for the state.
//
// NOTE: There can only be a single transaction in this section because the checkpoint handle holds
// transaction lock. Therefore we don't need to worry about concurrent modifications of
// preserved chunks.
if let Err(e) = ckpt_handle
.create_checkpoint(checkpoint_data_from_catalog(&catalog))
.await
{
warn!(%e, "cannot create catalog checkpoint");
// That's somewhat OK. Don't fail the entire task, because the actual preservation was completed
// (both in-mem and within the preserved catalog).
}
}
}
// We know this chunk is ParquetFile type
let chunk = chunk.read();
Ok(DbChunk::parquet_file_snapshot(&chunk))
};
Ok((tracker, fut.track(registration)))
}