feat: persist real (non-fake) part.+DB checkpoints

pull/24376/head
Marco Neumann 2021-07-13 10:48:49 +02:00
parent 8276511bd3
commit 2b0a4bbe0a
4 changed files with 54 additions and 28 deletions

View File

@ -471,8 +471,15 @@ impl Db {
chunk_id: u32,
) -> Result<Arc<DbChunk>> {
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
let (_, fut) =
lifecycle::write_chunk_to_object_store(chunk.write()).context(LifecycleError)?;
let partition = self.partition(table_name, partition_key)?;
let (partition_checkpoint, database_checkpoint) =
lifecycle::collect_checkpoints(&partition, partition_key, table_name, &self.catalog);
let (_, fut) = lifecycle::write_chunk_to_object_store(
chunk.write(),
partition_checkpoint,
database_checkpoint,
)
.context(LifecycleError)?;
fut.await.context(TaskCancelled)?.context(LifecycleError)
}

View File

@ -32,7 +32,7 @@ pub(crate) use error::{Error, Result};
pub(crate) use move_chunk::move_chunk_to_read_buffer;
use persistence_windows::persistence_windows::FlushHandle;
pub(crate) use unload::unload_read_buffer_chunk;
pub(crate) use write::write_chunk_to_object_store;
pub(crate) use write::{collect_checkpoints, write_chunk_to_object_store};
use super::DbChunk;

View File

@ -14,6 +14,7 @@ use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use crate::db::catalog::chunk::CatalogChunk;
use crate::db::catalog::partition::Partition;
use crate::db::lifecycle::write::collect_checkpoints;
use crate::db::lifecycle::{
collect_rub, merge_schemas, new_rub_chunk, write_chunk_to_object_store,
};
@ -103,6 +104,8 @@ pub(super) fn persist_chunks(
let persisted_rows = to_persist.rows();
let remainder_rows = remainder.rows();
let (partition_checkpoint, database_checkpoint) =
collect_checkpoints(&partition, &partition_key, &table_name, &db.catalog);
let persist_fut = {
let mut partition = partition.write();
@ -126,7 +129,7 @@ pub(super) fn persist_chunks(
// Drop partition lock guard after locking chunk
std::mem::drop(partition);
write_chunk_to_object_store(to_persist)?.1
write_chunk_to_object_store(to_persist, partition_checkpoint, database_checkpoint)?.1
};
// Wait for write operation to complete

View File

@ -1,6 +1,10 @@
//! This module contains the code to write chunks to the object store
use crate::db::{
catalog::chunk::{CatalogChunk, ChunkStage},
catalog::{
chunk::{CatalogChunk, ChunkStage},
partition::Partition,
Catalog,
},
checkpoint_data_from_catalog,
lifecycle::LockableCatalogChunk,
DbChunk,
@ -24,8 +28,8 @@ use persistence_windows::checkpoint::{
};
use query::QueryChunk;
use snafu::ResultExt;
use std::{collections::BTreeMap, future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use std::{future::Future, sync::Arc};
use tracker::{RwLock, TaskTracker, TrackedFuture, TrackedFutureExt};
use super::error::{
CommitError, Error, ParquetChunkError, Result, TransactionError, WritingToObjectStore,
@ -37,6 +41,8 @@ use super::error::{
/// The caller can either spawn this future to tokio, or block directly on it
pub fn write_chunk_to_object_store(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>,
partition_checkpoint: PartitionCheckpoint,
database_checkpoint: DatabaseCheckpoint,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
@ -101,11 +107,6 @@ pub fn write_chunk_to_object_store(
//
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
// between creation and the transaction commit.
let (partition_checkpoint, database_checkpoint) =
fake_partition_and_database_checkpoint(
Arc::clone(&addr.table_name),
Arc::clone(&addr.partition_key),
);
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: Arc::clone(&addr.table_name),
@ -189,22 +190,37 @@ pub fn write_chunk_to_object_store(
Ok((tracker, fut.track(registration)))
}
/// Fake until we have the split implementation in-place.
fn fake_partition_and_database_checkpoint(
table_name: Arc<str>,
partition_key: Arc<str>,
/// Construct partition and database checkpoint for the given partition in the given catalog.
pub fn collect_checkpoints(
partition: &RwLock<Partition>,
partition_key: &str,
table_name: &str,
catalog: &Catalog,
) -> (PartitionCheckpoint, DatabaseCheckpoint) {
// create partition checkpoint
let sequencer_numbers = BTreeMap::new();
let min_unpersisted_timestamp = Utc::now();
let partition_checkpoint = PartitionCheckpoint::new(
table_name,
partition_key,
sequencer_numbers,
min_unpersisted_timestamp,
);
// calculate checkpoint
let mut checkpoint_builder = {
// Record partition checkpoint and then flush persisted data from persistence windows.
let partition = partition.read();
PersistCheckpointBuilder::new(
partition
.partition_checkpoint()
.expect("persistence window removed"),
)
};
// build database checkpoint
let builder = PersistCheckpointBuilder::new(partition_checkpoint);
builder.build()
// collect checkpoints of all other partitions
if let Ok(table) = catalog.table(table_name) {
for partition in table.partitions() {
let partition = partition.read();
if partition.key() == partition_key {
continue;
}
if let Some(partition_checkpoint) = partition.partition_checkpoint() {
checkpoint_builder.register_other_partition(&partition_checkpoint);
}
}
}
checkpoint_builder.build()
}