refactor: adjust to upstream changes

pull/24376/head
Marco Neumann 2021-07-15 15:11:15 +02:00
parent 2b0a4bbe0a
commit 9683d91f32
4 changed files with 37 additions and 34 deletions

View File

@ -39,7 +39,7 @@ use parquet_file::{
use persistence_windows::persistence_windows::PersistenceWindows; use persistence_windows::persistence_windows::PersistenceWindows;
use query::{exec::Executor, predicate::Predicate, QueryDatabase}; use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson}; use rand_distr::{Distribution, Poisson};
use snafu::{ensure, ResultExt, Snafu}; use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{ use std::{
any::Any, any::Any,
collections::HashMap, collections::HashMap,
@ -130,6 +130,18 @@ pub enum Error {
TableBatchSchemaMergeError { TableBatchSchemaMergeError {
source: internal_types::schema::merge::Error, source: internal_types::schema::merge::Error,
}, },
#[snafu(display(
"Unable to generate partition checkpoint to write {}:{}:{}",
table_name,
partition_key,
chunk_id
))]
CannotWriteWithoutCheckpoint {
table_name: String,
partition_key: String,
chunk_id: u32,
},
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -471,9 +483,17 @@ impl Db {
chunk_id: u32, chunk_id: u32,
) -> Result<Arc<DbChunk>> { ) -> Result<Arc<DbChunk>> {
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
let partition = self.partition(table_name, partition_key)?; let partition_checkpoint = self
.partition(table_name, partition_key)?
.read()
.partition_checkpoint()
.context(CannotWriteWithoutCheckpoint {
table_name,
partition_key,
chunk_id,
})?;
let (partition_checkpoint, database_checkpoint) = let (partition_checkpoint, database_checkpoint) =
lifecycle::collect_checkpoints(&partition, partition_key, table_name, &self.catalog); lifecycle::collect_checkpoints(partition_checkpoint, &self.catalog);
let (_, fut) = lifecycle::write_chunk_to_object_store( let (_, fut) = lifecycle::write_chunk_to_object_store(
chunk.write(), chunk.write(),
partition_checkpoint, partition_checkpoint,

View File

@ -295,20 +295,10 @@ impl Partition {
/// Construct partition checkpoint out of contained persistence window, if any. /// Construct partition checkpoint out of contained persistence window, if any.
pub fn partition_checkpoint(&self) -> Option<PartitionCheckpoint> { pub fn partition_checkpoint(&self) -> Option<PartitionCheckpoint> {
if let Some(persistence_windows) = self.persistence_windows.as_ref() { self.persistence_windows
if let Some(min_unpersisted_timestamp) = .as_ref()
persistence_windows.minimum_unpersisted_timestamp() .map(|persistence_windows| persistence_windows.checkpoint())
{ .flatten()
return Some(PartitionCheckpoint::new(
Arc::clone(&self.table_name),
Arc::clone(&self.partition_key),
persistence_windows.sequencer_numbers_range(),
min_unpersisted_timestamp,
));
}
}
None
} }
} }

View File

@ -105,7 +105,7 @@ pub(super) fn persist_chunks(
let persisted_rows = to_persist.rows(); let persisted_rows = to_persist.rows();
let remainder_rows = remainder.rows(); let remainder_rows = remainder.rows();
let (partition_checkpoint, database_checkpoint) = let (partition_checkpoint, database_checkpoint) =
collect_checkpoints(&partition, &partition_key, &table_name, &db.catalog); collect_checkpoints(flush_handle.checkpoint(), &db.catalog);
let persist_fut = { let persist_fut = {
let mut partition = partition.write(); let mut partition = partition.write();

View File

@ -2,7 +2,6 @@
use crate::db::{ use crate::db::{
catalog::{ catalog::{
chunk::{CatalogChunk, ChunkStage}, chunk::{CatalogChunk, ChunkStage},
partition::Partition,
Catalog, Catalog,
}, },
checkpoint_data_from_catalog, checkpoint_data_from_catalog,
@ -29,7 +28,7 @@ use persistence_windows::checkpoint::{
use query::QueryChunk; use query::QueryChunk;
use snafu::ResultExt; use snafu::ResultExt;
use std::{future::Future, sync::Arc}; use std::{future::Future, sync::Arc};
use tracker::{RwLock, TaskTracker, TrackedFuture, TrackedFutureExt}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use super::error::{ use super::error::{
CommitError, Error, ParquetChunkError, Result, TransactionError, WritingToObjectStore, CommitError, Error, ParquetChunkError, Result, TransactionError, WritingToObjectStore,
@ -190,29 +189,23 @@ pub fn write_chunk_to_object_store(
Ok((tracker, fut.track(registration))) Ok((tracker, fut.track(registration)))
} }
/// Construct partition and database checkpoint for the given partition in the given catalog. /// Construct database checkpoint for the given partition checkpoint in the given catalog.
pub fn collect_checkpoints( pub fn collect_checkpoints(
partition: &RwLock<Partition>, partition_checkpoint: PartitionCheckpoint,
partition_key: &str,
table_name: &str,
catalog: &Catalog, catalog: &Catalog,
) -> (PartitionCheckpoint, DatabaseCheckpoint) { ) -> (PartitionCheckpoint, DatabaseCheckpoint) {
// remember partition data
let table_name = Arc::clone(partition_checkpoint.table_name());
let partition_key = Arc::clone(partition_checkpoint.partition_key());
// calculate checkpoint // calculate checkpoint
let mut checkpoint_builder = { let mut checkpoint_builder = PersistCheckpointBuilder::new(partition_checkpoint);
// Record partition checkpoint and then flush persisted data from persistence windows.
let partition = partition.read();
PersistCheckpointBuilder::new(
partition
.partition_checkpoint()
.expect("persistence window removed"),
)
};
// collect checkpoints of all other partitions // collect checkpoints of all other partitions
if let Ok(table) = catalog.table(table_name) { if let Ok(table) = catalog.table(table_name) {
for partition in table.partitions() { for partition in table.partitions() {
let partition = partition.read(); let partition = partition.read();
if partition.key() == partition_key { if partition.key() == partition_key.as_ref() {
continue; continue;
} }