chore: Merge branch 'main' into ntran/dedup_less_concat

pull/24376/head
Nga Tran 2021-06-30 16:45:01 -04:00
commit e8ef8e2790
15 changed files with 467 additions and 426 deletions

View File

@ -78,8 +78,7 @@ pub trait LifecycleDb {
pub trait LockablePartition: Sized { pub trait LockablePartition: Sized {
type Partition: LifecyclePartition; type Partition: LifecyclePartition;
type Chunk: LockableChunk; type Chunk: LockableChunk;
type DropError: std::error::Error + Send + Sync; type Error: std::error::Error + Send + Sync;
type CompactError: std::error::Error + Send + Sync;
/// Acquire a shared read lock on the chunk /// Acquire a shared read lock on the chunk
fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self>; fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self>;
@ -102,13 +101,13 @@ pub trait LockablePartition: Sized {
fn compact_chunks( fn compact_chunks(
partition: LifecycleWriteGuard<'_, Self::Partition, Self>, partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
chunks: Vec<LifecycleWriteGuard<'_, <Self::Chunk as LockableChunk>::Chunk, Self::Chunk>>, chunks: Vec<LifecycleWriteGuard<'_, <Self::Chunk as LockableChunk>::Chunk, Self::Chunk>>,
) -> Result<TaskTracker<<Self::Chunk as LockableChunk>::Job>, Self::CompactError>; ) -> Result<TaskTracker<<Self::Chunk as LockableChunk>::Job>, Self::Error>;
/// Drops a chunk from the partition /// Drops a chunk from the partition
fn drop_chunk( fn drop_chunk(
s: LifecycleWriteGuard<'_, Self::Partition, Self>, s: LifecycleWriteGuard<'_, Self::Partition, Self>,
chunk_id: u32, chunk_id: u32,
) -> Result<(), Self::DropError>; ) -> Result<(), Self::Error>;
} }
/// A `LockableChunk` is a wrapper around a `LifecycleChunk` that allows for /// A `LockableChunk` is a wrapper around a `LifecycleChunk` that allows for

View File

@ -599,8 +599,7 @@ mod tests {
impl<'a> LockablePartition for TestLockablePartition<'a> { impl<'a> LockablePartition for TestLockablePartition<'a> {
type Partition = TestPartition; type Partition = TestPartition;
type Chunk = TestLockableChunk<'a>; type Chunk = TestLockableChunk<'a>;
type DropError = Infallible; type Error = Infallible;
type CompactError = Infallible;
fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self> { fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self> {
LifecycleReadGuard::new(self.clone(), &self.partition) LifecycleReadGuard::new(self.clone(), &self.partition)
@ -640,7 +639,7 @@ mod tests {
fn compact_chunks( fn compact_chunks(
mut partition: LifecycleWriteGuard<'_, TestPartition, Self>, mut partition: LifecycleWriteGuard<'_, TestPartition, Self>,
chunks: Vec<LifecycleWriteGuard<'_, TestChunk, Self::Chunk>>, chunks: Vec<LifecycleWriteGuard<'_, TestChunk, Self::Chunk>>,
) -> Result<TaskTracker<()>, Self::CompactError> { ) -> Result<TaskTracker<()>, Self::Error> {
let id = partition.next_id; let id = partition.next_id;
partition.next_id += 1; partition.next_id += 1;
@ -665,7 +664,7 @@ mod tests {
fn drop_chunk( fn drop_chunk(
mut s: LifecycleWriteGuard<'_, Self::Partition, Self>, mut s: LifecycleWriteGuard<'_, Self::Partition, Self>,
chunk_id: u32, chunk_id: u32,
) -> Result<(), Self::DropError> { ) -> Result<(), Self::Error> {
s.chunks.remove(&chunk_id); s.chunks.remove(&chunk_id);
s.data().db.events.write().push(MoverEvents::Drop(chunk_id)); s.data().db.events.write().push(MoverEvents::Drop(chunk_id));
Ok(()) Ok(())

View File

@ -28,7 +28,7 @@ async fn setup() -> TestDb {
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) db.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
.await .await
.unwrap(); .unwrap();
@ -43,7 +43,7 @@ async fn setup() -> TestDb {
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) db.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
.await .await
.unwrap(); .unwrap();

View File

@ -111,7 +111,7 @@ impl DbSetup for NoData {
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
// Now load the closed chunk into the RB // Now load the closed chunk into the RB
db.load_chunk_to_read_buffer(table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
@ -149,7 +149,7 @@ impl DbSetup for NoData {
assert_eq!(count_object_store_chunks(&db), 0); // nothing yet assert_eq!(count_object_store_chunks(&db), 0); // nothing yet
// Now load the closed chunk into the RB // Now load the closed chunk into the RB
db.load_chunk_to_read_buffer(table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only
@ -348,7 +348,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
]; ];
write_lp(&db, &lp_lines.join("\n")).await; write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap(); db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 0) db.move_chunk_to_read_buffer("h2o", partition_key, 0)
.await .await
.unwrap(); .unwrap();
@ -383,7 +383,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
]; ];
write_lp(&db, &lp_lines.join("\n")).await; write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap(); db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 0) db.move_chunk_to_read_buffer("h2o", partition_key, 0)
.await .await
.unwrap(); .unwrap();
@ -394,7 +394,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
]; ];
write_lp(&db, &lp_lines.join("\n")).await; write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap(); db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 1) db.move_chunk_to_read_buffer("h2o", partition_key, 1)
.await .await
.unwrap(); .unwrap();
@ -426,7 +426,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
]; ];
write_lp(&db, &lp_lines.join("\n")).await; write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap(); db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 0) db.move_chunk_to_read_buffer("h2o", partition_key, 0)
.await .await
.unwrap(); .unwrap();
@ -443,7 +443,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
]; ];
write_lp(&db, &lp_lines.join("\n")).await; write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap(); db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 1) db.move_chunk_to_read_buffer("h2o", partition_key, 1)
.await .await
.unwrap(); .unwrap();
@ -460,7 +460,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
]; ];
write_lp(&db, &lp_lines.join("\n")).await; write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap(); db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 2) db.move_chunk_to_read_buffer("h2o", partition_key, 2)
.await .await
.unwrap(); .unwrap();
@ -477,7 +477,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
]; ];
write_lp(&db, &lp_lines.join("\n")).await; write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition("h2o", partition_key).await.unwrap(); db.rollover_partition("h2o", partition_key).await.unwrap();
db.load_chunk_to_read_buffer("h2o", partition_key, 3) db.move_chunk_to_read_buffer("h2o", partition_key, 3)
.await .await
.unwrap(); .unwrap();
@ -512,7 +512,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
// Use a background task to do the work note when I used // Use a background task to do the work note when I used
// TaskTracker::join, it ended up hanging for reasons I don't // TaskTracker::join, it ended up hanging for reasons I don't
// now // now
db.load_chunk_to_read_buffer("h2o", partition_key, 0) db.move_chunk_to_read_buffer("h2o", partition_key, 0)
.await .await
.unwrap(); .unwrap();
@ -619,7 +619,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
} }
@ -635,7 +635,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
@ -655,7 +655,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.write_chunk_to_object_store(&table_name, partition_key, 0) db.write_chunk_to_object_store(&table_name, partition_key, 0)
@ -712,7 +712,7 @@ pub async fn make_two_chunk_scenarios(
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
} }
@ -736,11 +736,11 @@ pub async fn make_two_chunk_scenarios(
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 1) db.move_chunk_to_read_buffer(&table_name, partition_key, 1)
.await .await
.unwrap(); .unwrap();
} }
@ -763,11 +763,11 @@ pub async fn make_two_chunk_scenarios(
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 1) db.move_chunk_to_read_buffer(&table_name, partition_key, 1)
.await .await
.unwrap(); .unwrap();
@ -798,11 +798,11 @@ pub async fn make_two_chunk_scenarios(
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 1) db.move_chunk_to_read_buffer(&table_name, partition_key, 1)
.await .await
.unwrap(); .unwrap();
@ -833,7 +833,7 @@ pub async fn rollover_and_load(db: &Db, partition_key: &str, table_name: &str) {
db.rollover_partition(table_name, partition_key) db.rollover_partition(table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(table_name, partition_key, 0) db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.write_chunk_to_object_store(table_name, partition_key, 0) db.write_chunk_to_object_store(table_name, partition_key, 0)
@ -853,7 +853,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
} }
@ -869,7 +869,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
db.rollover_partition(&table_name, partition_key) db.rollover_partition(&table_name, partition_key)
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, 0) db.move_chunk_to_read_buffer(&table_name, partition_key, 0)
.await .await
.unwrap(); .unwrap();
db.write_chunk_to_object_store(&table_name, partition_key, 0) db.write_chunk_to_object_store(&table_name, partition_key, 0)

View File

@ -15,44 +15,33 @@ use crate::{
write_buffer::WriteBuffer, write_buffer::WriteBuffer,
JobRegistry, JobRegistry,
}; };
use ::lifecycle::{LifecycleWriteGuard, LockableChunk}; use ::lifecycle::LockableChunk;
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Utc; use chrono::Utc;
use data_types::{ use data_types::{
chunk_metadata::{ChunkAddr, ChunkSummary}, chunk_metadata::ChunkSummary,
database_rules::DatabaseRules, database_rules::DatabaseRules,
job::Job,
partition_metadata::{PartitionSummary, TableSummary}, partition_metadata::{PartitionSummary, TableSummary},
server_id::ServerId, server_id::ServerId,
}; };
use datafusion::{ use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
physical_plan::SendableRecordBatchStream,
};
use entry::{Entry, SequencedEntry}; use entry::{Entry, SequencedEntry};
use internal_types::{arrow::sort::sort_record_batch, selection::Selection};
use metrics::KeyValue; use metrics::KeyValue;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use mutable_buffer::persistence_windows::PersistenceWindows; use mutable_buffer::persistence_windows::PersistenceWindows;
use object_store::{path::parsed::DirsAndFileName, ObjectStore}; use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{debug, error, info, warn}; use observability_deps::tracing::{debug, error, info};
use parking_lot::RwLock; use parking_lot::RwLock;
use parquet_file::{ use parquet_file::{
catalog::{CheckpointData, PreservedCatalog}, catalog::{CheckpointData, PreservedCatalog},
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files}, cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
metadata::IoxMetadata,
storage::Storage,
}; };
use query::{exec::Executor, predicate::Predicate, QueryDatabase}; use query::{exec::Executor, predicate::Predicate, QueryDatabase};
use rand_distr::{Distribution, Poisson}; use rand_distr::{Distribution, Poisson};
use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
use snafu::{ensure, ResultExt, Snafu}; use snafu::{ensure, ResultExt, Snafu};
use std::{ use std::{
any::Any, any::Any,
collections::HashMap, collections::HashMap,
future::Future,
num::NonZeroUsize, num::NonZeroUsize,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@ -60,7 +49,6 @@ use std::{
}, },
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
pub mod access; pub mod access;
pub mod catalog; pub mod catalog;
@ -72,6 +60,7 @@ mod process_clock;
mod streams; mod streams;
mod system_tables; mod system_tables;
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(context(false))] #[snafu(context(false))]
@ -81,48 +70,10 @@ pub enum Error {
PartitionError { source: catalog::partition::Error }, PartitionError { source: catalog::partition::Error },
#[snafu(display("Lifecycle error: {}", source))] #[snafu(display("Lifecycle error: {}", source))]
LifecycleError { source: catalog::chunk::Error }, LifecycleError { source: lifecycle::Error },
#[snafu(display( #[snafu(display("Error freeinzing chunk while rolling over partition: {}", source))]
"Can not drop chunk {}:{}:{} which has an in-progress lifecycle action {}. Wait for this to complete", FreezingChunk { source: catalog::chunk::Error },
partition_key,
table_name,
chunk_id,
action
))]
DropMovingChunk {
partition_key: String,
table_name: String,
chunk_id: u32,
action: String,
},
#[snafu(display("Read Buffer Error in chunk {}{} : {}", chunk_id, table_name, source))]
ReadBufferChunkError {
source: read_buffer::Error,
table_name: String,
chunk_id: u32,
},
#[snafu(display(
"Read Buffer Schema Error in chunk {}:{} : {}",
chunk_id,
table_name,
source
))]
ReadBufferChunkSchemaError {
source: read_buffer::Error,
table_name: String,
chunk_id: u32,
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore {
source: parquet_file::storage::Error,
},
#[snafu(display("Unknown Mutable Buffer Chunk {}", chunk_id))]
UnknownMutableBufferChunk { chunk_id: u32 },
#[snafu(display("Error sending entry to write buffer"))] #[snafu(display("Error sending entry to write buffer"))]
WriteBufferError { WriteBufferError {
@ -157,22 +108,6 @@ pub enum Error {
#[snafu(display("Error building sequenced entry: {}", source))] #[snafu(display("Error building sequenced entry: {}", source))]
SequencedEntryError { source: entry::SequencedEntryError }, SequencedEntryError { source: entry::SequencedEntryError },
#[snafu(display("Error while creating parquet chunk: {}", source))]
ParquetChunkError { source: parquet_file::chunk::Error },
#[snafu(display("Error while handling transaction on preserved catalog: {}", source))]
TransactionError {
source: parquet_file::catalog::Error,
},
#[snafu(display("Error while commiting transaction on preserved catalog: {}", source))]
CommitError {
source: parquet_file::catalog::Error,
},
#[snafu(display("Cannot write chunk: {}", addr))]
CannotWriteChunk { addr: ChunkAddr },
#[snafu(display("background task cancelled: {}", source))] #[snafu(display("background task cancelled: {}", source))]
TaskCancelled { source: futures::future::Aborted }, TaskCancelled { source: futures::future::Aborted },
@ -369,7 +304,7 @@ impl Db {
if let Some(chunk) = chunk { if let Some(chunk) = chunk {
let mut chunk = chunk.write(); let mut chunk = chunk.write();
chunk.freeze().context(LifecycleError)?; chunk.freeze().context(FreezingChunk)?;
Ok(Some(DbChunk::snapshot(&chunk))) Ok(Some(DbChunk::snapshot(&chunk)))
} else { } else {
@ -423,86 +358,16 @@ impl Db {
/// but the process may take a long time /// but the process may take a long time
/// ///
/// Returns a handle to the newly loaded chunk in the read buffer /// Returns a handle to the newly loaded chunk in the read buffer
pub async fn load_chunk_to_read_buffer( pub async fn move_chunk_to_read_buffer(
&self, &self,
table_name: &str, table_name: &str,
partition_key: &str, partition_key: &str,
chunk_id: u32, chunk_id: u32,
) -> Result<Arc<DbChunk>> { ) -> Result<Arc<DbChunk>> {
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
let (_, fut) = Self::load_chunk_to_read_buffer_impl(chunk.write())?; let (_, fut) =
fut.await.context(TaskCancelled)? lifecycle::move_chunk_to_read_buffer(chunk.write()).context(LifecycleError)?;
} fut.await.context(TaskCancelled)?.context(LifecycleError)
/// The implementation for moving a chunk to the read buffer
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
fn load_chunk_to_read_buffer_impl(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
let (tracker, registration) = db.jobs.register(Job::CloseChunk {
db_name: addr.db_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: addr.table_name.to_string(),
chunk_id: addr.chunk_id,
});
// update the catalog to say we are processing this chunk and
// then drop the lock while we do the work
let (mb_chunk, table_summary) = {
let mb_chunk = guard.set_moving(&registration).context(LifecycleError)?;
(mb_chunk, guard.table_summary())
};
// Drop locks
let chunk = guard.unwrap().chunk;
// create a new read buffer chunk with memory tracking
let metrics = db
.metrics_registry
.register_domain_with_labels("read_buffer", db.metric_labels.clone());
let mut rb_chunk = RBChunk::new(
&table_summary.name,
ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
);
let fut = async move {
info!(chunk=%addr, "chunk marked MOVING, loading tables into read buffer");
// load table into the new chunk one by one.
debug!(chunk=%addr, "loading table to read buffer");
let batch = mb_chunk
.read_filter(Selection::All)
// It is probably reasonable to recover from this error
// (reset the chunk state to Open) but until that is
// implemented (and tested) just panic
.expect("Loading chunk to mutable buffer");
let sorted = sort_record_batch(batch).expect("failed to sort");
rb_chunk.upsert_table(&table_summary.name, sorted);
// Can drop and re-acquire as lifecycle action prevents concurrent modification
let mut guard = chunk.write();
// update the catalog to say we are done processing
guard
.set_moved(Arc::new(rb_chunk))
.expect("failed to move chunk");
debug!(chunk=%addr, "chunk marked MOVED. loading complete");
Ok(DbChunk::snapshot(&guard))
};
Ok((tracker, fut.track(registration)))
} }
/// Write given table of a given chunk to object store. /// Write given table of a given chunk to object store.
@ -514,170 +379,9 @@ impl Db {
chunk_id: u32, chunk_id: u32,
) -> Result<Arc<DbChunk>> { ) -> Result<Arc<DbChunk>> {
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
let (_, fut) = Self::write_chunk_to_object_store_impl(chunk.write())?; let (_, fut) =
fut.await.context(TaskCancelled)? lifecycle::write_chunk_to_object_store(chunk.write()).context(LifecycleError)?;
} fut.await.context(TaskCancelled)?.context(LifecycleError)
/// The implementation for writing a chunk to the object store
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
fn write_chunk_to_object_store_impl(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
let (tracker, registration) = db.jobs.register(Job::WriteChunk {
db_name: addr.db_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: addr.table_name.to_string(),
chunk_id: addr.chunk_id,
});
// update the catalog to say we are processing this chunk and
let rb_chunk = guard
.set_writing_to_object_store(&registration)
.context(LifecycleError)?;
debug!(chunk=%guard.addr(), "chunk marked WRITING , loading tables into object store");
// Create a storage to save data of this chunk
let storage = Storage::new(Arc::clone(&db.store), db.server_id);
let catalog_transactions_until_checkpoint = db
.rules
.read()
.lifecycle_rules
.catalog_transactions_until_checkpoint
.get();
let preserved_catalog = Arc::clone(&db.preserved_catalog);
let catalog = Arc::clone(&db.catalog);
let object_store = Arc::clone(&db.store);
let cleanup_lock = Arc::clone(&db.cleanup_lock);
// Drop locks
let chunk = guard.unwrap().chunk;
let fut = async move {
debug!(chunk=%addr, "loading table to object store");
let predicate = read_buffer::Predicate::default();
// Get RecordBatchStream of data from the read buffer chunk
let read_results = rb_chunk.read_filter(&addr.table_name, predicate, Selection::All);
let arrow_schema: ArrowSchemaRef = rb_chunk
.read_filter_table_schema(Selection::All)
.expect("read buffer is infallible")
.into();
let stream: SendableRecordBatchStream = Box::pin(
streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)),
);
// check that the upcoming state change will very likely succeed
{
// re-lock
let guard = chunk.read();
if matches!(guard.stage(), &ChunkStage::Persisted { .. })
|| !guard.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting)
{
return Err(Error::CannotWriteChunk {
addr: guard.addr().clone(),
});
}
}
// catalog-level transaction for preservation layer
{
// fetch shared (= read) guard preventing the cleanup job from deleting our files
let _guard = cleanup_lock.read().await;
// Write this table data into the object store
//
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
// between creation and the transaction commit.
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: addr.table_name.to_string(),
partition_key: addr.partition_key.to_string(),
chunk_id: addr.chunk_id,
};
let (path, parquet_metadata) = storage
.write_to_object_store(addr, stream, metadata)
.await
.context(WritingToObjectStore)?;
let parquet_metadata = Arc::new(parquet_metadata);
let metrics = catalog
.metrics_registry
.register_domain_with_labels("parquet", catalog.metric_labels.clone());
let metrics =
ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet());
let parquet_chunk = Arc::new(
ParquetChunk::new(
path.clone(),
object_store,
Arc::clone(&parquet_metadata),
metrics,
)
.context(ParquetChunkError)?,
);
let path: DirsAndFileName = path.into();
// IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold the
// transaction lock (that is part of the PreservedCatalog) for too long. By using the
// cleanup lock (see above) it is ensured that the file that we have written is not deleted
// in between.
let mut transaction = preserved_catalog.open_transaction().await;
transaction
.add_parquet(&path, &parquet_metadata)
.context(TransactionError)?;
// preserved commit
let ckpt_handle = transaction.commit().await.context(CommitError)?;
// in-mem commit
{
let mut guard = chunk.write();
if let Err(e) = guard.set_written_to_object_store(parquet_chunk) {
panic!("Chunk written but cannot mark as written {}", e);
}
}
let create_checkpoint =
ckpt_handle.revision_counter() % catalog_transactions_until_checkpoint == 0;
if create_checkpoint {
// Commit is already done, so we can just scan the catalog for the state.
//
// NOTE: There can only be a single transaction in this section because the checkpoint handle holds
// transaction lock. Therefore we don't need to worry about concurrent modifications of
// preserved chunks.
if let Err(e) = ckpt_handle
.create_checkpoint(checkpoint_data_from_catalog(&catalog))
.await
{
warn!(%e, "cannot create catalog checkpoint");
// That's somewhat OK. Don't fail the entire task, because the actual preservation was completed
// (both in-mem and within the preserved catalog).
}
}
}
// We know this chunk is ParquetFile type
let chunk = chunk.read();
Ok(DbChunk::parquet_file_snapshot(&chunk))
};
Ok((tracker, fut.track(registration)))
} }
/// Unload chunk from read buffer but keep it in object store /// Unload chunk from read buffer but keep it in object store
@ -689,21 +393,7 @@ impl Db {
) -> Result<Arc<DbChunk>> { ) -> Result<Arc<DbChunk>> {
let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?;
let chunk = chunk.write(); let chunk = chunk.write();
Self::unload_read_buffer_impl(chunk) lifecycle::unload_read_buffer_chunk(chunk).context(LifecycleError)
}
pub fn unload_read_buffer_impl(
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<Arc<DbChunk>> {
debug!(chunk=%chunk.addr(), "unloading chunk from read buffer");
chunk
.set_unload_from_read_buffer()
.context(LifecycleError {})?;
debug!(chunk=%chunk.addr(), "chunk marked UNLOADED from read buffer");
Ok(DbChunk::snapshot(&chunk))
} }
/// Return chunk summary information for all chunks in the specified /// Return chunk summary information for all chunks in the specified
@ -770,7 +460,7 @@ impl Db {
self.worker_iterations_lifecycle self.worker_iterations_lifecycle
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
tokio::select! { tokio::select! {
_ = policy.check_for_work(chrono::Utc::now(), std::time::Instant::now()) => {}, _ = policy.check_for_work(Utc::now(), std::time::Instant::now()) => {},
_ = shutdown.cancelled() => break, _ = shutdown.cancelled() => break,
} }
} }
@ -1051,7 +741,7 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData
for chunk in catalog.chunks() { for chunk in catalog.chunks() {
let guard = chunk.read(); let guard = chunk.read();
if let catalog::chunk::ChunkStage::Persisted { parquet, .. } = guard.stage() { if let ChunkStage::Persisted { parquet, .. } = guard.stage() {
let path: DirsAndFileName = parquet.path().into(); let path: DirsAndFileName = parquet.path().into();
files.insert(path, parquet.parquet_metadata()); files.insert(path, parquet.parquet_metadata());
} }
@ -1107,8 +797,8 @@ mod tests {
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use bytes::Bytes; use bytes::Bytes;
use chrono::Utc;
use futures::{stream, StreamExt, TryStreamExt}; use futures::{stream, StreamExt, TryStreamExt};
use internal_types::selection::Selection;
use mutable_buffer::persistence_windows::MinMaxSequence; use mutable_buffer::persistence_windows::MinMaxSequence;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -1133,15 +823,15 @@ mod tests {
use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
use crate::{ use crate::{
db::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}, db::{
catalog::chunk::{ChunkStage, ChunkStageFrozenRepr},
test_helpers::{try_write_lp, write_lp},
},
utils::{make_db, TestDb}, utils::{make_db, TestDb},
write_buffer::test_helpers::MockBuffer, write_buffer::test_helpers::MockBuffer,
}; };
use super::{ use super::*;
test_helpers::{try_write_lp, write_lp},
*,
};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>; type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>; type Result<T, E = Error> = std::result::Result<T, E>;
@ -1361,7 +1051,7 @@ mod tests {
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1143) catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1143)
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", 0) db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
.await .await
.unwrap(); .unwrap();
@ -1531,7 +1221,7 @@ mod tests {
.unwrap() .unwrap()
.unwrap(); .unwrap();
let rb_chunk = db let rb_chunk = db
.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
.await .await
.unwrap(); .unwrap();
@ -1624,7 +1314,7 @@ mod tests {
let mb = collect_read_filter(&mb_chunk).await; let mb = collect_read_filter(&mb_chunk).await;
let rb_chunk = db let rb_chunk = db
.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
.await .await
.unwrap(); .unwrap();
@ -1734,7 +1424,7 @@ mod tests {
.unwrap(); .unwrap();
// Move that MB chunk to RB chunk and drop it from MB // Move that MB chunk to RB chunk and drop it from MB
let rb_chunk = db let rb_chunk = db
.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
.await .await
.unwrap(); .unwrap();
// Write the RB chunk to Object Store but keep it in RB // Write the RB chunk to Object Store but keep it in RB
@ -1833,7 +1523,7 @@ mod tests {
.unwrap(); .unwrap();
// Move that MB chunk to RB chunk and drop it from MB // Move that MB chunk to RB chunk and drop it from MB
let rb_chunk = db let rb_chunk = db
.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
.await .await
.unwrap(); .unwrap();
// Write the RB chunk to Object Store but keep it in RB // Write the RB chunk to Object Store but keep it in RB
@ -2134,7 +1824,7 @@ mod tests {
.await .await
.unwrap() .unwrap()
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) db.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
.await .await
.unwrap(); .unwrap();
@ -2280,7 +1970,7 @@ mod tests {
print!("Partitions: {:?}", db.partition_keys().unwrap()); print!("Partitions: {:?}", db.partition_keys().unwrap());
db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", 0) db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
.await .await
.unwrap(); .unwrap();
@ -2364,7 +2054,7 @@ mod tests {
write_lp(&db, "mem foo=1 1").await; write_lp(&db, "mem foo=1 1").await;
// load a chunk to the read buffer // load a chunk to the read buffer
db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", chunk_id) db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", chunk_id)
.await .await
.unwrap(); .unwrap();
@ -2552,7 +2242,7 @@ mod tests {
.unwrap() .unwrap()
.unwrap(); .unwrap();
let rb_chunk = db let rb_chunk = db
.load_chunk_to_read_buffer(table_name, partition_key, mb_chunk.id()) .move_chunk_to_read_buffer(table_name, partition_key, mb_chunk.id())
.await .await
.unwrap(); .unwrap();
assert_eq!(mb_chunk.id(), rb_chunk.id()); assert_eq!(mb_chunk.id(), rb_chunk.id());
@ -2999,7 +2689,7 @@ mod tests {
mb_chunk.id() mb_chunk.id()
}; };
// Move that MB chunk to RB chunk and drop it from MB // Move that MB chunk to RB chunk and drop it from MB
db.load_chunk_to_read_buffer(table_name, partition_key, chunk_id) db.move_chunk_to_read_buffer(table_name, partition_key, chunk_id)
.await .await
.unwrap(); .unwrap();

View File

@ -18,7 +18,16 @@ use crate::db::catalog::chunk::CatalogChunk;
use crate::db::catalog::partition::Partition; use crate::db::catalog::partition::Partition;
use crate::Db; use crate::Db;
pub(crate) use error::{Error, Result};
pub(crate) use move_chunk::move_chunk_to_read_buffer;
pub(crate) use unload::unload_read_buffer_chunk;
pub(crate) use write::write_chunk_to_object_store;
mod compact; mod compact;
mod error;
mod move_chunk;
mod unload;
mod write;
/// ///
/// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db` /// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db`
@ -38,8 +47,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
type Job = Job; type Job = Job;
// TODO: Separate error enumeration for lifecycle actions - db::Error is large type Error = Error;
type Error = super::Error;
fn read(&self) -> LifecycleReadGuard<'_, Self::Chunk, Self> { fn read(&self) -> LifecycleReadGuard<'_, Self::Chunk, Self> {
LifecycleReadGuard::new(self.clone(), self.chunk.as_ref()) LifecycleReadGuard::new(self.clone(), self.chunk.as_ref())
@ -53,7 +61,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
s: LifecycleWriteGuard<'_, Self::Chunk, Self>, s: LifecycleWriteGuard<'_, Self::Chunk, Self>,
) -> Result<TaskTracker<Self::Job>, Self::Error> { ) -> Result<TaskTracker<Self::Job>, Self::Error> {
info!(chunk=%s.addr(), "move to read buffer"); info!(chunk=%s.addr(), "move to read buffer");
let (tracker, fut) = Db::load_chunk_to_read_buffer_impl(s)?; let (tracker, fut) = move_chunk::move_chunk_to_read_buffer(s)?;
let _ = tokio::spawn(async move { fut.await.log_if_error("move to read buffer") }); let _ = tokio::spawn(async move { fut.await.log_if_error("move to read buffer") });
Ok(tracker) Ok(tracker)
} }
@ -62,7 +70,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
s: LifecycleWriteGuard<'_, Self::Chunk, Self>, s: LifecycleWriteGuard<'_, Self::Chunk, Self>,
) -> Result<TaskTracker<Self::Job>, Self::Error> { ) -> Result<TaskTracker<Self::Job>, Self::Error> {
info!(chunk=%s.addr(), "writing to object store"); info!(chunk=%s.addr(), "writing to object store");
let (tracker, fut) = Db::write_chunk_to_object_store_impl(s)?; let (tracker, fut) = write::write_chunk_to_object_store(s)?;
let _ = tokio::spawn(async move { fut.await.log_if_error("writing to object store") }); let _ = tokio::spawn(async move { fut.await.log_if_error("writing to object store") });
Ok(tracker) Ok(tracker)
} }
@ -72,7 +80,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
info!(chunk=%s.addr(), "unloading from readbuffer"); info!(chunk=%s.addr(), "unloading from readbuffer");
let _ = Db::unload_read_buffer_impl(s)?; let _ = self::unload::unload_read_buffer_chunk(s)?;
Ok(()) Ok(())
} }
} }
@ -95,9 +103,7 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> {
type Chunk = LockableCatalogChunk<'a>; type Chunk = LockableCatalogChunk<'a>;
type DropError = super::catalog::partition::Error; type Error = super::lifecycle::Error;
type CompactError = super::lifecycle::compact::Error;
fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self> { fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self> {
LifecycleReadGuard::new(self.clone(), self.partition.as_ref()) LifecycleReadGuard::new(self.clone(), self.partition.as_ref())
@ -135,7 +141,7 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> {
fn compact_chunks( fn compact_chunks(
partition: LifecycleWriteGuard<'_, Self::Partition, Self>, partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, Self::Chunk>>, chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, Self::Chunk>>,
) -> Result<TaskTracker<Job>, Self::CompactError> { ) -> Result<TaskTracker<Job>, Self::Error> {
info!(table=%partition.table_name(), partition=%partition.partition_key(), "compacting chunks"); info!(table=%partition.table_name(), partition=%partition.partition_key(), "compacting chunks");
let (tracker, fut) = compact::compact_chunks(partition, chunks)?; let (tracker, fut) = compact::compact_chunks(partition, chunks)?;
let _ = tokio::spawn(async move { fut.await.log_if_error("compacting chunks") }); let _ = tokio::spawn(async move { fut.await.log_if_error("compacting chunks") });
@ -145,7 +151,7 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> {
fn drop_chunk( fn drop_chunk(
mut s: LifecycleWriteGuard<'_, Self::Partition, Self>, mut s: LifecycleWriteGuard<'_, Self::Partition, Self>,
chunk_id: u32, chunk_id: u32,
) -> Result<(), Self::DropError> { ) -> Result<(), Self::Error> {
s.drop_chunk(chunk_id)?; s.drop_chunk(chunk_id)?;
Ok(()) Ok(())
} }

View File

@ -5,44 +5,22 @@ use std::sync::Arc;
use futures::StreamExt; use futures::StreamExt;
use hashbrown::HashMap; use hashbrown::HashMap;
use snafu::Snafu;
use data_types::job::Job; use data_types::job::Job;
use data_types::partition_metadata::{InfluxDbType, TableSummary}; use data_types::partition_metadata::{InfluxDbType, TableSummary};
use internal_types::schema::sort::SortKey; use internal_types::schema::sort::SortKey;
use internal_types::schema::TIME_COLUMN_NAME; use internal_types::schema::TIME_COLUMN_NAME;
use lifecycle::LifecycleWriteGuard; use lifecycle::LifecycleWriteGuard;
use query::frontend::reorg::{self, ReorgPlanner}; use query::frontend::reorg::ReorgPlanner;
use query::QueryChunkMeta; use query::QueryChunkMeta;
use read_buffer::{ChunkMetrics, RBChunk}; use read_buffer::{ChunkMetrics, RBChunk};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use crate::db::catalog::chunk::{self, CatalogChunk}; use crate::db::catalog::chunk::CatalogChunk;
use crate::db::catalog::partition::{self, Partition}; use crate::db::catalog::partition::Partition;
use crate::db::DbChunk; use crate::db::DbChunk;
use super::{LockableCatalogChunk, LockableCatalogPartition}; use super::{error::Result, LockableCatalogChunk, LockableCatalogPartition};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(context(false))]
PartitionError { source: partition::Error },
#[snafu(context(false))]
ChunkError { source: chunk::Error },
#[snafu(context(false))]
PlannerError { source: reorg::Error },
#[snafu(context(false))]
ArrowError { source: arrow::error::ArrowError },
#[snafu(context(false))]
DataFusionError {
source: datafusion::error::DataFusionError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Compute a sort key that orders lower cardinality columns first /// Compute a sort key that orders lower cardinality columns first
/// ///
@ -76,7 +54,7 @@ fn compute_sort_key<'a>(summaries: impl Iterator<Item = &'a TableSummary>) -> So
/// Compact the provided chunks into a single chunk /// Compact the provided chunks into a single chunk
/// ///
/// TODO: Replace low-level locks with transaction object /// TODO: Replace low-level locks with transaction object
pub(super) fn compact_chunks( pub(crate) fn compact_chunks(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition<'_>>, partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition<'_>>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>>, chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>>,
) -> Result<( ) -> Result<(

View File

@ -0,0 +1,60 @@
//! Errors that can occur during lifecycle actions
use snafu::Snafu;
use data_types::chunk_metadata::ChunkAddr;
use crate::db::catalog;
#[derive(Debug, Snafu)]
// Export the snafu "selectors" so they can be used in other modules
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(context(false))]
PartitionError { source: catalog::partition::Error },
#[snafu(context(false))]
ChunkError { source: catalog::chunk::Error },
#[snafu(context(false))]
PlannerError {
source: query::frontend::reorg::Error,
},
#[snafu(context(false))]
ArrowError { source: arrow::error::ArrowError },
#[snafu(context(false))]
DataFusionError {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Read Buffer Error in chunk {}{} : {}", chunk_id, table_name, source))]
ReadBufferChunkError {
source: read_buffer::Error,
table_name: String,
chunk_id: u32,
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore {
source: parquet_file::storage::Error,
},
#[snafu(display("Error while creating parquet chunk: {}", source))]
ParquetChunkError { source: parquet_file::chunk::Error },
#[snafu(display("Error while handling transaction on preserved catalog: {}", source))]
TransactionError {
source: parquet_file::catalog::Error,
},
#[snafu(display("Error while commiting transaction on preserved catalog: {}", source))]
CommitError {
source: parquet_file::catalog::Error,
},
#[snafu(display("Cannot write chunk: {}", addr))]
CannotWriteChunk { addr: ChunkAddr },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

View File

@ -0,0 +1,83 @@
use crate::db::catalog::chunk::CatalogChunk;
pub(crate) use crate::db::chunk::DbChunk;
use ::lifecycle::LifecycleWriteGuard;
use data_types::job::Job;
use internal_types::{arrow::sort::sort_record_batch, selection::Selection};
use observability_deps::tracing::{debug, info};
use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use super::{error::Result, LockableCatalogChunk};
/// The implementation for moving a chunk to the read buffer
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
pub fn move_chunk_to_read_buffer(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
let (tracker, registration) = db.jobs.register(Job::CloseChunk {
db_name: addr.db_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: addr.table_name.to_string(),
chunk_id: addr.chunk_id,
});
// update the catalog to say we are processing this chunk and
// then drop the lock while we do the work
let (mb_chunk, table_summary) = {
let mb_chunk = guard.set_moving(&registration)?;
(mb_chunk, guard.table_summary())
};
// Drop locks
let chunk = guard.unwrap().chunk;
// create a new read buffer chunk with memory tracking
let metrics = db
.metrics_registry
.register_domain_with_labels("read_buffer", db.metric_labels.clone());
let mut rb_chunk = RBChunk::new(
&table_summary.name,
ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
);
let fut = async move {
info!(chunk=%addr, "chunk marked MOVING, loading tables into read buffer");
// load table into the new chunk one by one.
debug!(chunk=%addr, "loading table to read buffer");
let batch = mb_chunk
.read_filter(Selection::All)
// It is probably reasonable to recover from this error
// (reset the chunk state to Open) but until that is
// implemented (and tested) just panic
.expect("Loading chunk to mutable buffer");
let sorted = sort_record_batch(batch).expect("failed to sort");
rb_chunk.upsert_table(&table_summary.name, sorted);
// Can drop and re-acquire as lifecycle action prevents concurrent modification
let mut guard = chunk.write();
// update the catalog to say we are done processing
guard
.set_moved(Arc::new(rb_chunk))
.expect("failed to move chunk");
debug!(chunk=%addr, "chunk marked MOVED. loading complete");
Ok(DbChunk::snapshot(&guard))
};
Ok((tracker, fut.track(registration)))
}

View File

@ -0,0 +1,24 @@
//! This module contains the code to unload chunks from the read buffer
use std::sync::Arc;
use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::debug;
use crate::db::{catalog::chunk::CatalogChunk, DbChunk};
use super::LockableCatalogChunk;
use super::error::Result;
pub fn unload_read_buffer_chunk(
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<Arc<DbChunk>> {
debug!(chunk=%chunk.addr(), "unloading chunk from read buffer");
chunk.set_unload_from_read_buffer()?;
debug!(chunk=%chunk.addr(), "chunk marked UNLOADED from read buffer");
Ok(DbChunk::snapshot(&chunk))
}

View File

@ -0,0 +1,189 @@
//! This module contains the code to write chunks to the object store
use crate::db::{
catalog::chunk::{CatalogChunk, ChunkStage},
checkpoint_data_from_catalog,
lifecycle::LockableCatalogChunk,
streams, DbChunk,
};
use ::lifecycle::LifecycleWriteGuard;
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use chrono::Utc;
use data_types::job::Job;
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::selection::Selection;
use object_store::path::parsed::DirsAndFileName;
use observability_deps::tracing::{debug, warn};
use parquet_file::{
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
metadata::IoxMetadata,
storage::Storage,
};
use snafu::ResultExt;
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
use super::error::{
CommitError, Error, ParquetChunkError, Result, TransactionError, WritingToObjectStore,
};
/// The implementation for writing a chunk to the object store
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
pub fn write_chunk_to_object_store(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
let (tracker, registration) = db.jobs.register(Job::WriteChunk {
db_name: addr.db_name.to_string(),
partition_key: addr.partition_key.to_string(),
table_name: addr.table_name.to_string(),
chunk_id: addr.chunk_id,
});
// update the catalog to say we are processing this chunk and
let rb_chunk = guard.set_writing_to_object_store(&registration)?;
debug!(chunk=%guard.addr(), "chunk marked WRITING , loading tables into object store");
// Create a storage to save data of this chunk
let storage = Storage::new(Arc::clone(&db.store), db.server_id);
let catalog_transactions_until_checkpoint = db
.rules
.read()
.lifecycle_rules
.catalog_transactions_until_checkpoint
.get();
let preserved_catalog = Arc::clone(&db.preserved_catalog);
let catalog = Arc::clone(&db.catalog);
let object_store = Arc::clone(&db.store);
let cleanup_lock = Arc::clone(&db.cleanup_lock);
// Drop locks
let chunk = guard.unwrap().chunk;
let fut = async move {
debug!(chunk=%addr, "loading table to object store");
let predicate = read_buffer::Predicate::default();
// Get RecordBatchStream of data from the read buffer chunk
let read_results = rb_chunk.read_filter(&addr.table_name, predicate, Selection::All);
let arrow_schema: ArrowSchemaRef = rb_chunk
.read_filter_table_schema(Selection::All)
.expect("read buffer is infallible")
.into();
let stream: SendableRecordBatchStream = Box::pin(streams::ReadFilterResultsStream::new(
read_results,
Arc::clone(&arrow_schema),
));
// check that the upcoming state change will very likely succeed
{
// re-lock
let guard = chunk.read();
if matches!(guard.stage(), &ChunkStage::Persisted { .. })
|| !guard.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting)
{
return Err(Error::CannotWriteChunk {
addr: guard.addr().clone(),
});
}
}
// catalog-level transaction for preservation layer
{
// fetch shared (= read) guard preventing the cleanup job from deleting our files
let _guard = cleanup_lock.read().await;
// Write this table data into the object store
//
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
// between creation and the transaction commit.
let metadata = IoxMetadata {
creation_timestamp: Utc::now(),
table_name: addr.table_name.to_string(),
partition_key: addr.partition_key.to_string(),
chunk_id: addr.chunk_id,
};
let (path, parquet_metadata) = storage
.write_to_object_store(addr, stream, metadata)
.await
.context(WritingToObjectStore)?;
let parquet_metadata = Arc::new(parquet_metadata);
let metrics = catalog
.metrics_registry
.register_domain_with_labels("parquet", catalog.metric_labels.clone());
let metrics = ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet());
let parquet_chunk = Arc::new(
ParquetChunk::new(
path.clone(),
object_store,
Arc::clone(&parquet_metadata),
metrics,
)
.context(ParquetChunkError)?,
);
let path: DirsAndFileName = path.into();
// IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold the
// transaction lock (that is part of the PreservedCatalog) for too long. By using the
// cleanup lock (see above) it is ensured that the file that we have written is not deleted
// in between.
let mut transaction = preserved_catalog.open_transaction().await;
transaction
.add_parquet(&path, &parquet_metadata)
.context(TransactionError)?;
// preserved commit
let ckpt_handle = transaction.commit().await.context(CommitError)?;
// in-mem commit
{
let mut guard = chunk.write();
if let Err(e) = guard.set_written_to_object_store(parquet_chunk) {
panic!("Chunk written but cannot mark as written {}", e);
}
}
let create_checkpoint =
ckpt_handle.revision_counter() % catalog_transactions_until_checkpoint == 0;
if create_checkpoint {
// Commit is already done, so we can just scan the catalog for the state.
//
// NOTE: There can only be a single transaction in this section because the checkpoint handle holds
// transaction lock. Therefore we don't need to worry about concurrent modifications of
// preserved chunks.
if let Err(e) = ckpt_handle
.create_checkpoint(checkpoint_data_from_catalog(&catalog))
.await
{
warn!(%e, "cannot create catalog checkpoint");
// That's somewhat OK. Don't fail the entire task, because the actual preservation was completed
// (both in-mem and within the preserved catalog).
}
}
}
// We know this chunk is ParquetFile type
let chunk = chunk.read();
Ok(DbChunk::parquet_file_snapshot(&chunk))
};
Ok((tracker, fut.track(registration)))
}

View File

@ -80,7 +80,7 @@ async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
.await .await
.unwrap(); .unwrap();
db.load_chunk_to_read_buffer(&table_name, partition_key, chunk_id) db.move_chunk_to_read_buffer(&table_name, partition_key, chunk_id)
.await .await
.unwrap(); .unwrap();

View File

@ -2,7 +2,7 @@
use std::{pin::Pin, sync::Arc}; use std::{pin::Pin, sync::Arc};
use futures::Stream; use futures::Stream;
use observability_deps::tracing::error; use observability_deps::tracing::{info, warn};
use serde::Deserialize; use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use tonic::{Interceptor, Request, Response, Streaming}; use tonic::{Interceptor, Request, Response, Streaming};
@ -70,7 +70,20 @@ impl From<Error> for tonic::Status {
/// Converts a result from the business logic into the appropriate tonic /// Converts a result from the business logic into the appropriate tonic
/// status /// status
fn from(err: Error) -> Self { fn from(err: Error) -> Self {
error!("Error handling Flight gRPC request: {}", err); // An explicit match on the Error enum will ensure appropriate
// logging is handled for any new error variants.
let msg = "Error handling Flight gRPC request";
match err {
Error::DatabaseNotFound { .. }
| Error::InvalidTicket { .. }
| Error::InvalidQuery { .. }
// TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development
| Error::InvalidDatabaseName { .. } => info!(?err, msg),
Error::Query { .. } => info!(?err, msg),
Error::DictionaryError { .. }
| Error::InvalidRecordBatch { .. }
| Error::Planning { .. } => warn!(?err, msg),
}
err.to_status() err.to_status()
} }
} }

View File

@ -2762,16 +2762,16 @@ mod tests {
} }
} }
/// loop and try to make a client connection for 5 seconds, /// loop and try to make a client connection for 30 seconds,
/// returning the result of the connection /// returning the result of the connection
async fn connect_to_server<T>(bind_addr: SocketAddr) -> Result<T, tonic::transport::Error> async fn connect_to_server<T>(bind_addr: SocketAddr) -> Result<T, tonic::transport::Error>
where where
T: NewClient, T: NewClient,
{ {
const MAX_RETRIES: u32 = 10; const MAX_RETRIES: u32 = 30;
let mut retry_count = 0; let mut retry_count = 0;
loop { loop {
let mut interval = tokio::time::interval(Duration::from_millis(500)); let mut interval = tokio::time::interval(Duration::from_millis(1000));
match T::connect(format!("http://{}", bind_addr)).await { match T::connect(format!("http://{}", bind_addr)).await {
Ok(client) => { Ok(client) => {

View File

@ -356,7 +356,7 @@ impl TestServer {
let try_http_connect = async { let try_http_connect = async {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let url = format!("{}/health", self.addrs().http_base); let url = format!("{}/health", self.addrs().http_base);
let mut interval = tokio::time::interval(Duration::from_millis(500)); let mut interval = tokio::time::interval(Duration::from_millis(1000));
loop { loop {
match client.get(&url).send().await { match client.get(&url).send().await {
Ok(resp) => { Ok(resp) => {
@ -373,7 +373,7 @@ impl TestServer {
let pair = future::join(try_http_connect, try_grpc_connect); let pair = future::join(try_http_connect, try_grpc_connect);
let capped_check = tokio::time::timeout(Duration::from_secs(3), pair); let capped_check = tokio::time::timeout(Duration::from_secs(30), pair);
match capped_check.await { match capped_check.await {
Ok(_) => { Ok(_) => {
@ -412,7 +412,7 @@ impl TestServer {
// if server ID was set, we can also wait until DBs are loaded // if server ID was set, we can also wait until DBs are loaded
let check_dbs_loaded = async { let check_dbs_loaded = async {
let mut interval = tokio::time::interval(Duration::from_millis(500)); let mut interval = tokio::time::interval(Duration::from_millis(1000));
while !management_client while !management_client
.get_server_status() .get_server_status()
@ -424,7 +424,7 @@ impl TestServer {
} }
}; };
let capped_check = tokio::time::timeout(Duration::from_secs(3), check_dbs_loaded); let capped_check = tokio::time::timeout(Duration::from_secs(30), check_dbs_loaded);
match capped_check.await { match capped_check.await {
Ok(_) => { Ok(_) => {
@ -464,7 +464,7 @@ pub async fn grpc_channel(
} }
pub async fn wait_for_grpc(addrs: &BindAddresses) { pub async fn wait_for_grpc(addrs: &BindAddresses) {
let mut interval = tokio::time::interval(Duration::from_millis(500)); let mut interval = tokio::time::interval(Duration::from_millis(1000));
loop { loop {
match grpc_channel(addrs).await { match grpc_channel(addrs).await {