diff --git a/server/src/db.rs b/server/src/db.rs index 7377cc277d..fa2b1b90ca 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -471,8 +471,15 @@ impl Db { chunk_id: u32, ) -> Result> { let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; - let (_, fut) = - lifecycle::write_chunk_to_object_store(chunk.write()).context(LifecycleError)?; + let partition = self.partition(table_name, partition_key)?; + let (partition_checkpoint, database_checkpoint) = + lifecycle::collect_checkpoints(&partition, partition_key, table_name, &self.catalog); + let (_, fut) = lifecycle::write_chunk_to_object_store( + chunk.write(), + partition_checkpoint, + database_checkpoint, + ) + .context(LifecycleError)?; fut.await.context(TaskCancelled)?.context(LifecycleError) } diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 9572538fb9..1fa33bcefe 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -32,7 +32,7 @@ pub(crate) use error::{Error, Result}; pub(crate) use move_chunk::move_chunk_to_read_buffer; use persistence_windows::persistence_windows::FlushHandle; pub(crate) use unload::unload_read_buffer_chunk; -pub(crate) use write::write_chunk_to_object_store; +pub(crate) use write::{collect_checkpoints, write_chunk_to_object_store}; use super::DbChunk; diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 48fb6e6173..9cb431c91f 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -14,6 +14,7 @@ use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; use crate::db::catalog::chunk::CatalogChunk; use crate::db::catalog::partition::Partition; +use crate::db::lifecycle::write::collect_checkpoints; use crate::db::lifecycle::{ collect_rub, merge_schemas, new_rub_chunk, write_chunk_to_object_store, }; @@ -103,6 +104,8 @@ pub(super) fn persist_chunks( let persisted_rows = to_persist.rows(); let remainder_rows = remainder.rows(); + let (partition_checkpoint, database_checkpoint) = + collect_checkpoints(&partition, &partition_key, &table_name, &db.catalog); let persist_fut = { let mut partition = partition.write(); @@ -126,7 +129,7 @@ pub(super) fn persist_chunks( // Drop partition lock guard after locking chunk std::mem::drop(partition); - write_chunk_to_object_store(to_persist)?.1 + write_chunk_to_object_store(to_persist, partition_checkpoint, database_checkpoint)?.1 }; // Wait for write operation to complete diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index bb42235520..b3f054c676 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -1,6 +1,10 @@ //! This module contains the code to write chunks to the object store use crate::db::{ - catalog::chunk::{CatalogChunk, ChunkStage}, + catalog::{ + chunk::{CatalogChunk, ChunkStage}, + partition::Partition, + Catalog, + }, checkpoint_data_from_catalog, lifecycle::LockableCatalogChunk, DbChunk, @@ -24,8 +28,8 @@ use persistence_windows::checkpoint::{ }; use query::QueryChunk; use snafu::ResultExt; -use std::{collections::BTreeMap, future::Future, sync::Arc}; -use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; +use std::{future::Future, sync::Arc}; +use tracker::{RwLock, TaskTracker, TrackedFuture, TrackedFutureExt}; use super::error::{ CommitError, Error, ParquetChunkError, Result, TransactionError, WritingToObjectStore, @@ -37,6 +41,8 @@ use super::error::{ /// The caller can either spawn this future to tokio, or block directly on it pub fn write_chunk_to_object_store( mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>, + partition_checkpoint: PartitionCheckpoint, + database_checkpoint: DatabaseCheckpoint, ) -> Result<( TaskTracker, TrackedFuture>> + Send>, @@ -101,11 +107,6 @@ pub fn write_chunk_to_object_store( // // IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted // between creation and the transaction commit. - let (partition_checkpoint, database_checkpoint) = - fake_partition_and_database_checkpoint( - Arc::clone(&addr.table_name), - Arc::clone(&addr.partition_key), - ); let metadata = IoxMetadata { creation_timestamp: Utc::now(), table_name: Arc::clone(&addr.table_name), @@ -189,22 +190,37 @@ pub fn write_chunk_to_object_store( Ok((tracker, fut.track(registration))) } -/// Fake until we have the split implementation in-place. -fn fake_partition_and_database_checkpoint( - table_name: Arc, - partition_key: Arc, +/// Construct partition and database checkpoint for the given partition in the given catalog. +pub fn collect_checkpoints( + partition: &RwLock, + partition_key: &str, + table_name: &str, + catalog: &Catalog, ) -> (PartitionCheckpoint, DatabaseCheckpoint) { - // create partition checkpoint - let sequencer_numbers = BTreeMap::new(); - let min_unpersisted_timestamp = Utc::now(); - let partition_checkpoint = PartitionCheckpoint::new( - table_name, - partition_key, - sequencer_numbers, - min_unpersisted_timestamp, - ); + // calculate checkpoint + let mut checkpoint_builder = { + // Record partition checkpoint and then flush persisted data from persistence windows. + let partition = partition.read(); + PersistCheckpointBuilder::new( + partition + .partition_checkpoint() + .expect("persistence window removed"), + ) + }; - // build database checkpoint - let builder = PersistCheckpointBuilder::new(partition_checkpoint); - builder.build() + // collect checkpoints of all other partitions + if let Ok(table) = catalog.table(table_name) { + for partition in table.partitions() { + let partition = partition.read(); + if partition.key() == partition_key { + continue; + } + + if let Some(partition_checkpoint) = partition.partition_checkpoint() { + checkpoint_builder.register_other_partition(&partition_checkpoint); + } + } + } + + checkpoint_builder.build() }