diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index 040a41e557..e359a17f50 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -78,8 +78,7 @@ pub trait LifecycleDb { pub trait LockablePartition: Sized { type Partition: LifecyclePartition; type Chunk: LockableChunk; - type DropError: std::error::Error + Send + Sync; - type CompactError: std::error::Error + Send + Sync; + type Error: std::error::Error + Send + Sync; /// Acquire a shared read lock on the chunk fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self>; @@ -102,13 +101,13 @@ pub trait LockablePartition: Sized { fn compact_chunks( partition: LifecycleWriteGuard<'_, Self::Partition, Self>, chunks: Vec::Chunk, Self::Chunk>>, - ) -> Result::Job>, Self::CompactError>; + ) -> Result::Job>, Self::Error>; /// Drops a chunk from the partition fn drop_chunk( s: LifecycleWriteGuard<'_, Self::Partition, Self>, chunk_id: u32, - ) -> Result<(), Self::DropError>; + ) -> Result<(), Self::Error>; } /// A `LockableChunk` is a wrapper around a `LifecycleChunk` that allows for diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 434dec273e..d8ce571bd3 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -599,8 +599,7 @@ mod tests { impl<'a> LockablePartition for TestLockablePartition<'a> { type Partition = TestPartition; type Chunk = TestLockableChunk<'a>; - type DropError = Infallible; - type CompactError = Infallible; + type Error = Infallible; fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self> { LifecycleReadGuard::new(self.clone(), &self.partition) @@ -640,7 +639,7 @@ mod tests { fn compact_chunks( mut partition: LifecycleWriteGuard<'_, TestPartition, Self>, chunks: Vec>, - ) -> Result, Self::CompactError> { + ) -> Result, Self::Error> { let id = partition.next_id; partition.next_id += 1; @@ -665,7 +664,7 @@ mod tests { fn drop_chunk( mut s: LifecycleWriteGuard<'_, Self::Partition, Self>, chunk_id: u32, - ) -> Result<(), Self::DropError> { + ) -> Result<(), Self::Error> { s.chunks.remove(&chunk_id); s.data().db.events.write().push(MoverEvents::Drop(chunk_id)); Ok(()) diff --git a/query_tests/src/pruning.rs b/query_tests/src/pruning.rs index 2e1e4f4ca6..eb48cc9f6b 100644 --- a/query_tests/src/pruning.rs +++ b/query_tests/src/pruning.rs @@ -28,7 +28,7 @@ async fn setup() -> TestDb { .await .unwrap() .unwrap(); - db.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) + db.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .await .unwrap(); @@ -43,7 +43,7 @@ async fn setup() -> TestDb { .await .unwrap() .unwrap(); - db.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) + db.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .await .unwrap(); diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 2c96456f06..b2160410c0 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -111,7 +111,7 @@ impl DbSetup for NoData { assert_eq!(count_object_store_chunks(&db), 0); // nothing yet // Now load the closed chunk into the RB - db.load_chunk_to_read_buffer(table_name, partition_key, 0) + db.move_chunk_to_read_buffer(table_name, partition_key, 0) .await .unwrap(); assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only @@ -149,7 +149,7 @@ impl DbSetup for NoData { assert_eq!(count_object_store_chunks(&db), 0); // nothing yet // Now load the closed chunk into the RB - db.load_chunk_to_read_buffer(table_name, partition_key, 0) + db.move_chunk_to_read_buffer(table_name, partition_key, 0) .await .unwrap(); assert_eq!(count_mutable_buffer_chunks(&db), 0); // open chunk only @@ -348,7 +348,7 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks { ]; write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); - db.load_chunk_to_read_buffer("h2o", partition_key, 0) + db.move_chunk_to_read_buffer("h2o", partition_key, 0) .await .unwrap(); @@ -383,7 +383,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet { ]; write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); - db.load_chunk_to_read_buffer("h2o", partition_key, 0) + db.move_chunk_to_read_buffer("h2o", partition_key, 0) .await .unwrap(); @@ -394,7 +394,7 @@ impl DbSetup for OneMeasurementTwoChunksDifferentTagSet { ]; write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); - db.load_chunk_to_read_buffer("h2o", partition_key, 1) + db.move_chunk_to_read_buffer("h2o", partition_key, 1) .await .unwrap(); @@ -426,7 +426,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates { ]; write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); - db.load_chunk_to_read_buffer("h2o", partition_key, 0) + db.move_chunk_to_read_buffer("h2o", partition_key, 0) .await .unwrap(); @@ -443,7 +443,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates { ]; write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); - db.load_chunk_to_read_buffer("h2o", partition_key, 1) + db.move_chunk_to_read_buffer("h2o", partition_key, 1) .await .unwrap(); @@ -460,7 +460,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates { ]; write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); - db.load_chunk_to_read_buffer("h2o", partition_key, 2) + db.move_chunk_to_read_buffer("h2o", partition_key, 2) .await .unwrap(); @@ -477,7 +477,7 @@ impl DbSetup for OneMeasurementThreeChunksWithDuplicates { ]; write_lp(&db, &lp_lines.join("\n")).await; db.rollover_partition("h2o", partition_key).await.unwrap(); - db.load_chunk_to_read_buffer("h2o", partition_key, 3) + db.move_chunk_to_read_buffer("h2o", partition_key, 3) .await .unwrap(); @@ -512,7 +512,7 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle { // Use a background task to do the work note when I used // TaskTracker::join, it ended up hanging for reasons I don't // now - db.load_chunk_to_read_buffer("h2o", partition_key, 0) + db.move_chunk_to_read_buffer("h2o", partition_key, 0) .await .unwrap(); @@ -619,7 +619,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> db.rollover_partition(&table_name, partition_key) .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 0) + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) .await .unwrap(); } @@ -635,7 +635,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> db.rollover_partition(&table_name, partition_key) .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 0) + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) .await .unwrap(); @@ -655,7 +655,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> db.rollover_partition(&table_name, partition_key) .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 0) + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) .await .unwrap(); db.write_chunk_to_object_store(&table_name, partition_key, 0) @@ -712,7 +712,7 @@ pub async fn make_two_chunk_scenarios( db.rollover_partition(&table_name, partition_key) .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 0) + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) .await .unwrap(); } @@ -736,11 +736,11 @@ pub async fn make_two_chunk_scenarios( .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 0) + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 1) + db.move_chunk_to_read_buffer(&table_name, partition_key, 1) .await .unwrap(); } @@ -763,11 +763,11 @@ pub async fn make_two_chunk_scenarios( .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 0) + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 1) + db.move_chunk_to_read_buffer(&table_name, partition_key, 1) .await .unwrap(); @@ -798,11 +798,11 @@ pub async fn make_two_chunk_scenarios( .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 0) + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 1) + db.move_chunk_to_read_buffer(&table_name, partition_key, 1) .await .unwrap(); @@ -833,7 +833,7 @@ pub async fn rollover_and_load(db: &Db, partition_key: &str, table_name: &str) { db.rollover_partition(table_name, partition_key) .await .unwrap(); - db.load_chunk_to_read_buffer(table_name, partition_key, 0) + db.move_chunk_to_read_buffer(table_name, partition_key, 0) .await .unwrap(); db.write_chunk_to_object_store(table_name, partition_key, 0) @@ -853,7 +853,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario( db.rollover_partition(&table_name, partition_key) .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 0) + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) .await .unwrap(); } @@ -869,7 +869,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario( db.rollover_partition(&table_name, partition_key) .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, 0) + db.move_chunk_to_read_buffer(&table_name, partition_key, 0) .await .unwrap(); db.write_chunk_to_object_store(&table_name, partition_key, 0) diff --git a/server/src/db.rs b/server/src/db.rs index 84013b13f0..67c8db474c 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -15,44 +15,33 @@ use crate::{ write_buffer::WriteBuffer, JobRegistry, }; -use ::lifecycle::{LifecycleWriteGuard, LockableChunk}; -use arrow::datatypes::SchemaRef as ArrowSchemaRef; +use ::lifecycle::LockableChunk; use async_trait::async_trait; use chrono::Utc; use data_types::{ - chunk_metadata::{ChunkAddr, ChunkSummary}, + chunk_metadata::ChunkSummary, database_rules::DatabaseRules, - job::Job, partition_metadata::{PartitionSummary, TableSummary}, server_id::ServerId, }; -use datafusion::{ - catalog::{catalog::CatalogProvider, schema::SchemaProvider}, - physical_plan::SendableRecordBatchStream, -}; +use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider}; use entry::{Entry, SequencedEntry}; -use internal_types::{arrow::sort::sort_record_batch, selection::Selection}; use metrics::KeyValue; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use mutable_buffer::persistence_windows::PersistenceWindows; use object_store::{path::parsed::DirsAndFileName, ObjectStore}; -use observability_deps::tracing::{debug, error, info, warn}; +use observability_deps::tracing::{debug, error, info}; use parking_lot::RwLock; use parquet_file::{ catalog::{CheckpointData, PreservedCatalog}, - chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files}, - metadata::IoxMetadata, - storage::Storage, }; use query::{exec::Executor, predicate::Predicate, QueryDatabase}; use rand_distr::{Distribution, Poisson}; -use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk}; use snafu::{ensure, ResultExt, Snafu}; use std::{ any::Any, collections::HashMap, - future::Future, num::NonZeroUsize, sync::{ atomic::{AtomicUsize, Ordering}, @@ -60,7 +49,6 @@ use std::{ }, time::{Duration, Instant}, }; -use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; pub mod access; pub mod catalog; @@ -72,6 +60,7 @@ mod process_clock; mod streams; mod system_tables; +#[allow(clippy::large_enum_variant)] #[derive(Debug, Snafu)] pub enum Error { #[snafu(context(false))] @@ -81,48 +70,10 @@ pub enum Error { PartitionError { source: catalog::partition::Error }, #[snafu(display("Lifecycle error: {}", source))] - LifecycleError { source: catalog::chunk::Error }, + LifecycleError { source: lifecycle::Error }, - #[snafu(display( - "Can not drop chunk {}:{}:{} which has an in-progress lifecycle action {}. Wait for this to complete", - partition_key, - table_name, - chunk_id, - action - ))] - DropMovingChunk { - partition_key: String, - table_name: String, - chunk_id: u32, - action: String, - }, - - #[snafu(display("Read Buffer Error in chunk {}{} : {}", chunk_id, table_name, source))] - ReadBufferChunkError { - source: read_buffer::Error, - table_name: String, - chunk_id: u32, - }, - - #[snafu(display( - "Read Buffer Schema Error in chunk {}:{} : {}", - chunk_id, - table_name, - source - ))] - ReadBufferChunkSchemaError { - source: read_buffer::Error, - table_name: String, - chunk_id: u32, - }, - - #[snafu(display("Error writing to object store: {}", source))] - WritingToObjectStore { - source: parquet_file::storage::Error, - }, - - #[snafu(display("Unknown Mutable Buffer Chunk {}", chunk_id))] - UnknownMutableBufferChunk { chunk_id: u32 }, + #[snafu(display("Error freeinzing chunk while rolling over partition: {}", source))] + FreezingChunk { source: catalog::chunk::Error }, #[snafu(display("Error sending entry to write buffer"))] WriteBufferError { @@ -157,22 +108,6 @@ pub enum Error { #[snafu(display("Error building sequenced entry: {}", source))] SequencedEntryError { source: entry::SequencedEntryError }, - #[snafu(display("Error while creating parquet chunk: {}", source))] - ParquetChunkError { source: parquet_file::chunk::Error }, - - #[snafu(display("Error while handling transaction on preserved catalog: {}", source))] - TransactionError { - source: parquet_file::catalog::Error, - }, - - #[snafu(display("Error while commiting transaction on preserved catalog: {}", source))] - CommitError { - source: parquet_file::catalog::Error, - }, - - #[snafu(display("Cannot write chunk: {}", addr))] - CannotWriteChunk { addr: ChunkAddr }, - #[snafu(display("background task cancelled: {}", source))] TaskCancelled { source: futures::future::Aborted }, @@ -369,7 +304,7 @@ impl Db { if let Some(chunk) = chunk { let mut chunk = chunk.write(); - chunk.freeze().context(LifecycleError)?; + chunk.freeze().context(FreezingChunk)?; Ok(Some(DbChunk::snapshot(&chunk))) } else { @@ -423,86 +358,16 @@ impl Db { /// but the process may take a long time /// /// Returns a handle to the newly loaded chunk in the read buffer - pub async fn load_chunk_to_read_buffer( + pub async fn move_chunk_to_read_buffer( &self, table_name: &str, partition_key: &str, chunk_id: u32, ) -> Result> { let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; - let (_, fut) = Self::load_chunk_to_read_buffer_impl(chunk.write())?; - fut.await.context(TaskCancelled)? - } - - /// The implementation for moving a chunk to the read buffer - /// - /// Returns a future registered with the tracker registry, and the corresponding tracker - /// The caller can either spawn this future to tokio, or block directly on it - fn load_chunk_to_read_buffer_impl( - mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>, - ) -> Result<( - TaskTracker, - TrackedFuture>> + Send>, - )> { - let db = guard.data().db; - let addr = guard.addr().clone(); - // TODO: Use ChunkAddr within Job - let (tracker, registration) = db.jobs.register(Job::CloseChunk { - db_name: addr.db_name.to_string(), - partition_key: addr.partition_key.to_string(), - table_name: addr.table_name.to_string(), - chunk_id: addr.chunk_id, - }); - - // update the catalog to say we are processing this chunk and - // then drop the lock while we do the work - let (mb_chunk, table_summary) = { - let mb_chunk = guard.set_moving(®istration).context(LifecycleError)?; - (mb_chunk, guard.table_summary()) - }; - - // Drop locks - let chunk = guard.unwrap().chunk; - - // create a new read buffer chunk with memory tracking - let metrics = db - .metrics_registry - .register_domain_with_labels("read_buffer", db.metric_labels.clone()); - - let mut rb_chunk = RBChunk::new( - &table_summary.name, - ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()), - ); - - let fut = async move { - info!(chunk=%addr, "chunk marked MOVING, loading tables into read buffer"); - - // load table into the new chunk one by one. - debug!(chunk=%addr, "loading table to read buffer"); - let batch = mb_chunk - .read_filter(Selection::All) - // It is probably reasonable to recover from this error - // (reset the chunk state to Open) but until that is - // implemented (and tested) just panic - .expect("Loading chunk to mutable buffer"); - - let sorted = sort_record_batch(batch).expect("failed to sort"); - rb_chunk.upsert_table(&table_summary.name, sorted); - - // Can drop and re-acquire as lifecycle action prevents concurrent modification - let mut guard = chunk.write(); - - // update the catalog to say we are done processing - guard - .set_moved(Arc::new(rb_chunk)) - .expect("failed to move chunk"); - - debug!(chunk=%addr, "chunk marked MOVED. loading complete"); - - Ok(DbChunk::snapshot(&guard)) - }; - - Ok((tracker, fut.track(registration))) + let (_, fut) = + lifecycle::move_chunk_to_read_buffer(chunk.write()).context(LifecycleError)?; + fut.await.context(TaskCancelled)?.context(LifecycleError) } /// Write given table of a given chunk to object store. @@ -514,170 +379,9 @@ impl Db { chunk_id: u32, ) -> Result> { let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; - let (_, fut) = Self::write_chunk_to_object_store_impl(chunk.write())?; - fut.await.context(TaskCancelled)? - } - - /// The implementation for writing a chunk to the object store - /// - /// Returns a future registered with the tracker registry, and the corresponding tracker - /// The caller can either spawn this future to tokio, or block directly on it - fn write_chunk_to_object_store_impl( - mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>, - ) -> Result<( - TaskTracker, - TrackedFuture>> + Send>, - )> { - let db = guard.data().db; - let addr = guard.addr().clone(); - - // TODO: Use ChunkAddr within Job - let (tracker, registration) = db.jobs.register(Job::WriteChunk { - db_name: addr.db_name.to_string(), - partition_key: addr.partition_key.to_string(), - table_name: addr.table_name.to_string(), - chunk_id: addr.chunk_id, - }); - - // update the catalog to say we are processing this chunk and - let rb_chunk = guard - .set_writing_to_object_store(®istration) - .context(LifecycleError)?; - - debug!(chunk=%guard.addr(), "chunk marked WRITING , loading tables into object store"); - - // Create a storage to save data of this chunk - let storage = Storage::new(Arc::clone(&db.store), db.server_id); - - let catalog_transactions_until_checkpoint = db - .rules - .read() - .lifecycle_rules - .catalog_transactions_until_checkpoint - .get(); - - let preserved_catalog = Arc::clone(&db.preserved_catalog); - let catalog = Arc::clone(&db.catalog); - let object_store = Arc::clone(&db.store); - let cleanup_lock = Arc::clone(&db.cleanup_lock); - - // Drop locks - let chunk = guard.unwrap().chunk; - - let fut = async move { - debug!(chunk=%addr, "loading table to object store"); - - let predicate = read_buffer::Predicate::default(); - - // Get RecordBatchStream of data from the read buffer chunk - let read_results = rb_chunk.read_filter(&addr.table_name, predicate, Selection::All); - - let arrow_schema: ArrowSchemaRef = rb_chunk - .read_filter_table_schema(Selection::All) - .expect("read buffer is infallible") - .into(); - - let stream: SendableRecordBatchStream = Box::pin( - streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)), - ); - - // check that the upcoming state change will very likely succeed - { - // re-lock - let guard = chunk.read(); - if matches!(guard.stage(), &ChunkStage::Persisted { .. }) - || !guard.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting) - { - return Err(Error::CannotWriteChunk { - addr: guard.addr().clone(), - }); - } - } - - // catalog-level transaction for preservation layer - { - // fetch shared (= read) guard preventing the cleanup job from deleting our files - let _guard = cleanup_lock.read().await; - - // Write this table data into the object store - // - // IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted - // between creation and the transaction commit. - let metadata = IoxMetadata { - creation_timestamp: Utc::now(), - table_name: addr.table_name.to_string(), - partition_key: addr.partition_key.to_string(), - chunk_id: addr.chunk_id, - }; - let (path, parquet_metadata) = storage - .write_to_object_store(addr, stream, metadata) - .await - .context(WritingToObjectStore)?; - let parquet_metadata = Arc::new(parquet_metadata); - - let metrics = catalog - .metrics_registry - .register_domain_with_labels("parquet", catalog.metric_labels.clone()); - let metrics = - ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet()); - let parquet_chunk = Arc::new( - ParquetChunk::new( - path.clone(), - object_store, - Arc::clone(&parquet_metadata), - metrics, - ) - .context(ParquetChunkError)?, - ); - - let path: DirsAndFileName = path.into(); - - // IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold the - // transaction lock (that is part of the PreservedCatalog) for too long. By using the - // cleanup lock (see above) it is ensured that the file that we have written is not deleted - // in between. - let mut transaction = preserved_catalog.open_transaction().await; - transaction - .add_parquet(&path, &parquet_metadata) - .context(TransactionError)?; - - // preserved commit - let ckpt_handle = transaction.commit().await.context(CommitError)?; - - // in-mem commit - { - let mut guard = chunk.write(); - if let Err(e) = guard.set_written_to_object_store(parquet_chunk) { - panic!("Chunk written but cannot mark as written {}", e); - } - } - - let create_checkpoint = - ckpt_handle.revision_counter() % catalog_transactions_until_checkpoint == 0; - if create_checkpoint { - // Commit is already done, so we can just scan the catalog for the state. - // - // NOTE: There can only be a single transaction in this section because the checkpoint handle holds - // transaction lock. Therefore we don't need to worry about concurrent modifications of - // preserved chunks. - if let Err(e) = ckpt_handle - .create_checkpoint(checkpoint_data_from_catalog(&catalog)) - .await - { - warn!(%e, "cannot create catalog checkpoint"); - - // That's somewhat OK. Don't fail the entire task, because the actual preservation was completed - // (both in-mem and within the preserved catalog). - } - } - } - - // We know this chunk is ParquetFile type - let chunk = chunk.read(); - Ok(DbChunk::parquet_file_snapshot(&chunk)) - }; - - Ok((tracker, fut.track(registration))) + let (_, fut) = + lifecycle::write_chunk_to_object_store(chunk.write()).context(LifecycleError)?; + fut.await.context(TaskCancelled)?.context(LifecycleError) } /// Unload chunk from read buffer but keep it in object store @@ -689,21 +393,7 @@ impl Db { ) -> Result> { let chunk = self.lockable_chunk(table_name, partition_key, chunk_id)?; let chunk = chunk.write(); - Self::unload_read_buffer_impl(chunk) - } - - pub fn unload_read_buffer_impl( - mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>, - ) -> Result> { - debug!(chunk=%chunk.addr(), "unloading chunk from read buffer"); - - chunk - .set_unload_from_read_buffer() - .context(LifecycleError {})?; - - debug!(chunk=%chunk.addr(), "chunk marked UNLOADED from read buffer"); - - Ok(DbChunk::snapshot(&chunk)) + lifecycle::unload_read_buffer_chunk(chunk).context(LifecycleError) } /// Return chunk summary information for all chunks in the specified @@ -770,7 +460,7 @@ impl Db { self.worker_iterations_lifecycle .fetch_add(1, Ordering::Relaxed); tokio::select! { - _ = policy.check_for_work(chrono::Utc::now(), std::time::Instant::now()) => {}, + _ = policy.check_for_work(Utc::now(), std::time::Instant::now()) => {}, _ = shutdown.cancelled() => break, } } @@ -1051,7 +741,7 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData for chunk in catalog.chunks() { let guard = chunk.read(); - if let catalog::chunk::ChunkStage::Persisted { parquet, .. } = guard.stage() { + if let ChunkStage::Persisted { parquet, .. } = guard.stage() { let path: DirsAndFileName = parquet.path().into(); files.insert(path, parquet.parquet_metadata()); } @@ -1107,8 +797,8 @@ mod tests { use arrow::record_batch::RecordBatch; use bytes::Bytes; - use chrono::Utc; use futures::{stream, StreamExt, TryStreamExt}; + use internal_types::selection::Selection; use mutable_buffer::persistence_windows::MinMaxSequence; use tokio_util::sync::CancellationToken; @@ -1133,15 +823,15 @@ mod tests { use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; use crate::{ - db::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}, + db::{ + catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}, + test_helpers::{try_write_lp, write_lp}, + }, utils::{make_db, TestDb}, write_buffer::test_helpers::MockBuffer, }; - use super::{ - test_helpers::{try_write_lp, write_lp}, - *, - }; + use super::*; type Error = Box; type Result = std::result::Result; @@ -1361,7 +1051,7 @@ mod tests { catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1143) .unwrap(); - db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", 0) + db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0) .await .unwrap(); @@ -1531,7 +1221,7 @@ mod tests { .unwrap() .unwrap(); let rb_chunk = db - .load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) + .move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .await .unwrap(); @@ -1624,7 +1314,7 @@ mod tests { let mb = collect_read_filter(&mb_chunk).await; let rb_chunk = db - .load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) + .move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .await .unwrap(); @@ -1734,7 +1424,7 @@ mod tests { .unwrap(); // Move that MB chunk to RB chunk and drop it from MB let rb_chunk = db - .load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) + .move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .await .unwrap(); // Write the RB chunk to Object Store but keep it in RB @@ -1833,7 +1523,7 @@ mod tests { .unwrap(); // Move that MB chunk to RB chunk and drop it from MB let rb_chunk = db - .load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) + .move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .await .unwrap(); // Write the RB chunk to Object Store but keep it in RB @@ -2134,7 +1824,7 @@ mod tests { .await .unwrap() .unwrap(); - db.load_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) + db.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id()) .await .unwrap(); @@ -2280,7 +1970,7 @@ mod tests { print!("Partitions: {:?}", db.partition_keys().unwrap()); - db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", 0) + db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0) .await .unwrap(); @@ -2364,7 +2054,7 @@ mod tests { write_lp(&db, "mem foo=1 1").await; // load a chunk to the read buffer - db.load_chunk_to_read_buffer("cpu", "1970-01-01T00", chunk_id) + db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", chunk_id) .await .unwrap(); @@ -2552,7 +2242,7 @@ mod tests { .unwrap() .unwrap(); let rb_chunk = db - .load_chunk_to_read_buffer(table_name, partition_key, mb_chunk.id()) + .move_chunk_to_read_buffer(table_name, partition_key, mb_chunk.id()) .await .unwrap(); assert_eq!(mb_chunk.id(), rb_chunk.id()); @@ -2999,7 +2689,7 @@ mod tests { mb_chunk.id() }; // Move that MB chunk to RB chunk and drop it from MB - db.load_chunk_to_read_buffer(table_name, partition_key, chunk_id) + db.move_chunk_to_read_buffer(table_name, partition_key, chunk_id) .await .unwrap(); diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 51a139af45..ab28d42f6c 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -18,7 +18,16 @@ use crate::db::catalog::chunk::CatalogChunk; use crate::db::catalog::partition::Partition; use crate::Db; +pub(crate) use error::{Error, Result}; +pub(crate) use move_chunk::move_chunk_to_read_buffer; +pub(crate) use unload::unload_read_buffer_chunk; +pub(crate) use write::write_chunk_to_object_store; + mod compact; +mod error; +mod move_chunk; +mod unload; +mod write; /// /// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db` @@ -38,8 +47,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> { type Job = Job; - // TODO: Separate error enumeration for lifecycle actions - db::Error is large - type Error = super::Error; + type Error = Error; fn read(&self) -> LifecycleReadGuard<'_, Self::Chunk, Self> { LifecycleReadGuard::new(self.clone(), self.chunk.as_ref()) @@ -53,7 +61,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> { s: LifecycleWriteGuard<'_, Self::Chunk, Self>, ) -> Result, Self::Error> { info!(chunk=%s.addr(), "move to read buffer"); - let (tracker, fut) = Db::load_chunk_to_read_buffer_impl(s)?; + let (tracker, fut) = move_chunk::move_chunk_to_read_buffer(s)?; let _ = tokio::spawn(async move { fut.await.log_if_error("move to read buffer") }); Ok(tracker) } @@ -62,7 +70,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> { s: LifecycleWriteGuard<'_, Self::Chunk, Self>, ) -> Result, Self::Error> { info!(chunk=%s.addr(), "writing to object store"); - let (tracker, fut) = Db::write_chunk_to_object_store_impl(s)?; + let (tracker, fut) = write::write_chunk_to_object_store(s)?; let _ = tokio::spawn(async move { fut.await.log_if_error("writing to object store") }); Ok(tracker) } @@ -72,7 +80,7 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> { ) -> Result<(), Self::Error> { info!(chunk=%s.addr(), "unloading from readbuffer"); - let _ = Db::unload_read_buffer_impl(s)?; + let _ = self::unload::unload_read_buffer_chunk(s)?; Ok(()) } } @@ -95,9 +103,7 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> { type Chunk = LockableCatalogChunk<'a>; - type DropError = super::catalog::partition::Error; - - type CompactError = super::lifecycle::compact::Error; + type Error = super::lifecycle::Error; fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self> { LifecycleReadGuard::new(self.clone(), self.partition.as_ref()) @@ -135,7 +141,7 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> { fn compact_chunks( partition: LifecycleWriteGuard<'_, Self::Partition, Self>, chunks: Vec>, - ) -> Result, Self::CompactError> { + ) -> Result, Self::Error> { info!(table=%partition.table_name(), partition=%partition.partition_key(), "compacting chunks"); let (tracker, fut) = compact::compact_chunks(partition, chunks)?; let _ = tokio::spawn(async move { fut.await.log_if_error("compacting chunks") }); @@ -145,7 +151,7 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> { fn drop_chunk( mut s: LifecycleWriteGuard<'_, Self::Partition, Self>, chunk_id: u32, - ) -> Result<(), Self::DropError> { + ) -> Result<(), Self::Error> { s.drop_chunk(chunk_id)?; Ok(()) } diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index 280384ceb4..1db65498ea 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -5,44 +5,22 @@ use std::sync::Arc; use futures::StreamExt; use hashbrown::HashMap; -use snafu::Snafu; use data_types::job::Job; use data_types::partition_metadata::{InfluxDbType, TableSummary}; use internal_types::schema::sort::SortKey; use internal_types::schema::TIME_COLUMN_NAME; use lifecycle::LifecycleWriteGuard; -use query::frontend::reorg::{self, ReorgPlanner}; +use query::frontend::reorg::ReorgPlanner; use query::QueryChunkMeta; use read_buffer::{ChunkMetrics, RBChunk}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; -use crate::db::catalog::chunk::{self, CatalogChunk}; -use crate::db::catalog::partition::{self, Partition}; +use crate::db::catalog::chunk::CatalogChunk; +use crate::db::catalog::partition::Partition; use crate::db::DbChunk; -use super::{LockableCatalogChunk, LockableCatalogPartition}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(context(false))] - PartitionError { source: partition::Error }, - - #[snafu(context(false))] - ChunkError { source: chunk::Error }, - - #[snafu(context(false))] - PlannerError { source: reorg::Error }, - - #[snafu(context(false))] - ArrowError { source: arrow::error::ArrowError }, - - #[snafu(context(false))] - DataFusionError { - source: datafusion::error::DataFusionError, - }, -} -pub type Result = std::result::Result; +use super::{error::Result, LockableCatalogChunk, LockableCatalogPartition}; /// Compute a sort key that orders lower cardinality columns first /// @@ -76,7 +54,7 @@ fn compute_sort_key<'a>(summaries: impl Iterator) -> So /// Compact the provided chunks into a single chunk /// /// TODO: Replace low-level locks with transaction object -pub(super) fn compact_chunks( +pub(crate) fn compact_chunks( partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition<'_>>, chunks: Vec>>, ) -> Result<( diff --git a/server/src/db/lifecycle/error.rs b/server/src/db/lifecycle/error.rs new file mode 100644 index 0000000000..d0594f32c6 --- /dev/null +++ b/server/src/db/lifecycle/error.rs @@ -0,0 +1,60 @@ +//! Errors that can occur during lifecycle actions +use snafu::Snafu; + +use data_types::chunk_metadata::ChunkAddr; + +use crate::db::catalog; + +#[derive(Debug, Snafu)] +// Export the snafu "selectors" so they can be used in other modules +#[snafu(visibility = "pub")] +pub enum Error { + #[snafu(context(false))] + PartitionError { source: catalog::partition::Error }, + + #[snafu(context(false))] + ChunkError { source: catalog::chunk::Error }, + + #[snafu(context(false))] + PlannerError { + source: query::frontend::reorg::Error, + }, + + #[snafu(context(false))] + ArrowError { source: arrow::error::ArrowError }, + + #[snafu(context(false))] + DataFusionError { + source: datafusion::error::DataFusionError, + }, + + #[snafu(display("Read Buffer Error in chunk {}{} : {}", chunk_id, table_name, source))] + ReadBufferChunkError { + source: read_buffer::Error, + table_name: String, + chunk_id: u32, + }, + + #[snafu(display("Error writing to object store: {}", source))] + WritingToObjectStore { + source: parquet_file::storage::Error, + }, + + #[snafu(display("Error while creating parquet chunk: {}", source))] + ParquetChunkError { source: parquet_file::chunk::Error }, + + #[snafu(display("Error while handling transaction on preserved catalog: {}", source))] + TransactionError { + source: parquet_file::catalog::Error, + }, + + #[snafu(display("Error while commiting transaction on preserved catalog: {}", source))] + CommitError { + source: parquet_file::catalog::Error, + }, + + #[snafu(display("Cannot write chunk: {}", addr))] + CannotWriteChunk { addr: ChunkAddr }, +} + +pub type Result = std::result::Result; diff --git a/server/src/db/lifecycle/move_chunk.rs b/server/src/db/lifecycle/move_chunk.rs new file mode 100644 index 0000000000..0fbd6d1af8 --- /dev/null +++ b/server/src/db/lifecycle/move_chunk.rs @@ -0,0 +1,83 @@ +use crate::db::catalog::chunk::CatalogChunk; +pub(crate) use crate::db::chunk::DbChunk; +use ::lifecycle::LifecycleWriteGuard; +use data_types::job::Job; +use internal_types::{arrow::sort::sort_record_batch, selection::Selection}; + +use observability_deps::tracing::{debug, info}; +use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk}; +use std::{future::Future, sync::Arc}; +use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; + +use super::{error::Result, LockableCatalogChunk}; + +/// The implementation for moving a chunk to the read buffer +/// +/// Returns a future registered with the tracker registry, and the corresponding tracker +/// The caller can either spawn this future to tokio, or block directly on it +pub fn move_chunk_to_read_buffer( + mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>, +) -> Result<( + TaskTracker, + TrackedFuture>> + Send>, +)> { + let db = guard.data().db; + let addr = guard.addr().clone(); + // TODO: Use ChunkAddr within Job + let (tracker, registration) = db.jobs.register(Job::CloseChunk { + db_name: addr.db_name.to_string(), + partition_key: addr.partition_key.to_string(), + table_name: addr.table_name.to_string(), + chunk_id: addr.chunk_id, + }); + + // update the catalog to say we are processing this chunk and + // then drop the lock while we do the work + let (mb_chunk, table_summary) = { + let mb_chunk = guard.set_moving(®istration)?; + (mb_chunk, guard.table_summary()) + }; + + // Drop locks + let chunk = guard.unwrap().chunk; + + // create a new read buffer chunk with memory tracking + let metrics = db + .metrics_registry + .register_domain_with_labels("read_buffer", db.metric_labels.clone()); + + let mut rb_chunk = RBChunk::new( + &table_summary.name, + ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()), + ); + + let fut = async move { + info!(chunk=%addr, "chunk marked MOVING, loading tables into read buffer"); + + // load table into the new chunk one by one. + debug!(chunk=%addr, "loading table to read buffer"); + let batch = mb_chunk + .read_filter(Selection::All) + // It is probably reasonable to recover from this error + // (reset the chunk state to Open) but until that is + // implemented (and tested) just panic + .expect("Loading chunk to mutable buffer"); + + let sorted = sort_record_batch(batch).expect("failed to sort"); + rb_chunk.upsert_table(&table_summary.name, sorted); + + // Can drop and re-acquire as lifecycle action prevents concurrent modification + let mut guard = chunk.write(); + + // update the catalog to say we are done processing + guard + .set_moved(Arc::new(rb_chunk)) + .expect("failed to move chunk"); + + debug!(chunk=%addr, "chunk marked MOVED. loading complete"); + + Ok(DbChunk::snapshot(&guard)) + }; + + Ok((tracker, fut.track(registration))) +} diff --git a/server/src/db/lifecycle/unload.rs b/server/src/db/lifecycle/unload.rs new file mode 100644 index 0000000000..474b479346 --- /dev/null +++ b/server/src/db/lifecycle/unload.rs @@ -0,0 +1,24 @@ +//! This module contains the code to unload chunks from the read buffer + +use std::sync::Arc; + +use lifecycle::LifecycleWriteGuard; +use observability_deps::tracing::debug; + +use crate::db::{catalog::chunk::CatalogChunk, DbChunk}; + +use super::LockableCatalogChunk; + +use super::error::Result; + +pub fn unload_read_buffer_chunk( + mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>, +) -> Result> { + debug!(chunk=%chunk.addr(), "unloading chunk from read buffer"); + + chunk.set_unload_from_read_buffer()?; + + debug!(chunk=%chunk.addr(), "chunk marked UNLOADED from read buffer"); + + Ok(DbChunk::snapshot(&chunk)) +} diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs new file mode 100644 index 0000000000..f687ad9528 --- /dev/null +++ b/server/src/db/lifecycle/write.rs @@ -0,0 +1,189 @@ +//! This module contains the code to write chunks to the object store +use crate::db::{ + catalog::chunk::{CatalogChunk, ChunkStage}, + checkpoint_data_from_catalog, + lifecycle::LockableCatalogChunk, + streams, DbChunk, +}; + +use ::lifecycle::LifecycleWriteGuard; + +use arrow::datatypes::SchemaRef as ArrowSchemaRef; +use chrono::Utc; +use data_types::job::Job; +use datafusion::physical_plan::SendableRecordBatchStream; +use internal_types::selection::Selection; +use object_store::path::parsed::DirsAndFileName; +use observability_deps::tracing::{debug, warn}; +use parquet_file::{ + chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, + metadata::IoxMetadata, + storage::Storage, +}; +use snafu::ResultExt; +use std::{future::Future, sync::Arc}; +use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; + +use super::error::{ + CommitError, Error, ParquetChunkError, Result, TransactionError, WritingToObjectStore, +}; + +/// The implementation for writing a chunk to the object store +/// +/// Returns a future registered with the tracker registry, and the corresponding tracker +/// The caller can either spawn this future to tokio, or block directly on it +pub fn write_chunk_to_object_store( + mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>, +) -> Result<( + TaskTracker, + TrackedFuture>> + Send>, +)> { + let db = guard.data().db; + let addr = guard.addr().clone(); + + // TODO: Use ChunkAddr within Job + let (tracker, registration) = db.jobs.register(Job::WriteChunk { + db_name: addr.db_name.to_string(), + partition_key: addr.partition_key.to_string(), + table_name: addr.table_name.to_string(), + chunk_id: addr.chunk_id, + }); + + // update the catalog to say we are processing this chunk and + let rb_chunk = guard.set_writing_to_object_store(®istration)?; + + debug!(chunk=%guard.addr(), "chunk marked WRITING , loading tables into object store"); + + // Create a storage to save data of this chunk + let storage = Storage::new(Arc::clone(&db.store), db.server_id); + + let catalog_transactions_until_checkpoint = db + .rules + .read() + .lifecycle_rules + .catalog_transactions_until_checkpoint + .get(); + + let preserved_catalog = Arc::clone(&db.preserved_catalog); + let catalog = Arc::clone(&db.catalog); + let object_store = Arc::clone(&db.store); + let cleanup_lock = Arc::clone(&db.cleanup_lock); + + // Drop locks + let chunk = guard.unwrap().chunk; + + let fut = async move { + debug!(chunk=%addr, "loading table to object store"); + + let predicate = read_buffer::Predicate::default(); + + // Get RecordBatchStream of data from the read buffer chunk + let read_results = rb_chunk.read_filter(&addr.table_name, predicate, Selection::All); + + let arrow_schema: ArrowSchemaRef = rb_chunk + .read_filter_table_schema(Selection::All) + .expect("read buffer is infallible") + .into(); + + let stream: SendableRecordBatchStream = Box::pin(streams::ReadFilterResultsStream::new( + read_results, + Arc::clone(&arrow_schema), + )); + + // check that the upcoming state change will very likely succeed + { + // re-lock + let guard = chunk.read(); + if matches!(guard.stage(), &ChunkStage::Persisted { .. }) + || !guard.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting) + { + return Err(Error::CannotWriteChunk { + addr: guard.addr().clone(), + }); + } + } + + // catalog-level transaction for preservation layer + { + // fetch shared (= read) guard preventing the cleanup job from deleting our files + let _guard = cleanup_lock.read().await; + + // Write this table data into the object store + // + // IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted + // between creation and the transaction commit. + let metadata = IoxMetadata { + creation_timestamp: Utc::now(), + table_name: addr.table_name.to_string(), + partition_key: addr.partition_key.to_string(), + chunk_id: addr.chunk_id, + }; + let (path, parquet_metadata) = storage + .write_to_object_store(addr, stream, metadata) + .await + .context(WritingToObjectStore)?; + let parquet_metadata = Arc::new(parquet_metadata); + + let metrics = catalog + .metrics_registry + .register_domain_with_labels("parquet", catalog.metric_labels.clone()); + let metrics = ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet()); + let parquet_chunk = Arc::new( + ParquetChunk::new( + path.clone(), + object_store, + Arc::clone(&parquet_metadata), + metrics, + ) + .context(ParquetChunkError)?, + ); + + let path: DirsAndFileName = path.into(); + + // IMPORTANT: Start transaction AFTER writing the actual parquet file so we do not hold the + // transaction lock (that is part of the PreservedCatalog) for too long. By using the + // cleanup lock (see above) it is ensured that the file that we have written is not deleted + // in between. + let mut transaction = preserved_catalog.open_transaction().await; + transaction + .add_parquet(&path, &parquet_metadata) + .context(TransactionError)?; + + // preserved commit + let ckpt_handle = transaction.commit().await.context(CommitError)?; + + // in-mem commit + { + let mut guard = chunk.write(); + if let Err(e) = guard.set_written_to_object_store(parquet_chunk) { + panic!("Chunk written but cannot mark as written {}", e); + } + } + + let create_checkpoint = + ckpt_handle.revision_counter() % catalog_transactions_until_checkpoint == 0; + if create_checkpoint { + // Commit is already done, so we can just scan the catalog for the state. + // + // NOTE: There can only be a single transaction in this section because the checkpoint handle holds + // transaction lock. Therefore we don't need to worry about concurrent modifications of + // preserved chunks. + if let Err(e) = ckpt_handle + .create_checkpoint(checkpoint_data_from_catalog(&catalog)) + .await + { + warn!(%e, "cannot create catalog checkpoint"); + + // That's somewhat OK. Don't fail the entire task, because the actual preservation was completed + // (both in-mem and within the preserved catalog). + } + } + } + + // We know this chunk is ParquetFile type + let chunk = chunk.read(); + Ok(DbChunk::parquet_file_snapshot(&chunk)) + }; + + Ok((tracker, fut.track(registration))) +} diff --git a/server_benchmarks/benches/catalog_persistence.rs b/server_benchmarks/benches/catalog_persistence.rs index 6734bc6ac9..8ec5506246 100644 --- a/server_benchmarks/benches/catalog_persistence.rs +++ b/server_benchmarks/benches/catalog_persistence.rs @@ -80,7 +80,7 @@ async fn setup(object_store: Arc, done: &Mutex) { .await .unwrap(); - db.load_chunk_to_read_buffer(&table_name, partition_key, chunk_id) + db.move_chunk_to_read_buffer(&table_name, partition_key, chunk_id) .await .unwrap(); diff --git a/src/influxdb_ioxd/rpc/flight.rs b/src/influxdb_ioxd/rpc/flight.rs index b5580b6bdc..589eb6904b 100644 --- a/src/influxdb_ioxd/rpc/flight.rs +++ b/src/influxdb_ioxd/rpc/flight.rs @@ -2,7 +2,7 @@ use std::{pin::Pin, sync::Arc}; use futures::Stream; -use observability_deps::tracing::error; +use observability_deps::tracing::{info, warn}; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; use tonic::{Interceptor, Request, Response, Streaming}; @@ -70,7 +70,20 @@ impl From for tonic::Status { /// Converts a result from the business logic into the appropriate tonic /// status fn from(err: Error) -> Self { - error!("Error handling Flight gRPC request: {}", err); + // An explicit match on the Error enum will ensure appropriate + // logging is handled for any new error variants. + let msg = "Error handling Flight gRPC request"; + match err { + Error::DatabaseNotFound { .. } + | Error::InvalidTicket { .. } + | Error::InvalidQuery { .. } + // TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development + | Error::InvalidDatabaseName { .. } => info!(?err, msg), + Error::Query { .. } => info!(?err, msg), + Error::DictionaryError { .. } + | Error::InvalidRecordBatch { .. } + | Error::Planning { .. } => warn!(?err, msg), + } err.to_status() } } diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index f8a58fcfd9..bb08c26bcd 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -2762,16 +2762,16 @@ mod tests { } } - /// loop and try to make a client connection for 5 seconds, + /// loop and try to make a client connection for 30 seconds, /// returning the result of the connection async fn connect_to_server(bind_addr: SocketAddr) -> Result where T: NewClient, { - const MAX_RETRIES: u32 = 10; + const MAX_RETRIES: u32 = 30; let mut retry_count = 0; loop { - let mut interval = tokio::time::interval(Duration::from_millis(500)); + let mut interval = tokio::time::interval(Duration::from_millis(1000)); match T::connect(format!("http://{}", bind_addr)).await { Ok(client) => { diff --git a/tests/common/server_fixture.rs b/tests/common/server_fixture.rs index e2f2b1a491..fa1dddd0a2 100644 --- a/tests/common/server_fixture.rs +++ b/tests/common/server_fixture.rs @@ -356,7 +356,7 @@ impl TestServer { let try_http_connect = async { let client = reqwest::Client::new(); let url = format!("{}/health", self.addrs().http_base); - let mut interval = tokio::time::interval(Duration::from_millis(500)); + let mut interval = tokio::time::interval(Duration::from_millis(1000)); loop { match client.get(&url).send().await { Ok(resp) => { @@ -373,7 +373,7 @@ impl TestServer { let pair = future::join(try_http_connect, try_grpc_connect); - let capped_check = tokio::time::timeout(Duration::from_secs(3), pair); + let capped_check = tokio::time::timeout(Duration::from_secs(30), pair); match capped_check.await { Ok(_) => { @@ -412,7 +412,7 @@ impl TestServer { // if server ID was set, we can also wait until DBs are loaded let check_dbs_loaded = async { - let mut interval = tokio::time::interval(Duration::from_millis(500)); + let mut interval = tokio::time::interval(Duration::from_millis(1000)); while !management_client .get_server_status() @@ -424,7 +424,7 @@ impl TestServer { } }; - let capped_check = tokio::time::timeout(Duration::from_secs(3), check_dbs_loaded); + let capped_check = tokio::time::timeout(Duration::from_secs(30), check_dbs_loaded); match capped_check.await { Ok(_) => { @@ -464,7 +464,7 @@ pub async fn grpc_channel( } pub async fn wait_for_grpc(addrs: &BindAddresses) { - let mut interval = tokio::time::interval(Duration::from_millis(500)); + let mut interval = tokio::time::interval(Duration::from_millis(1000)); loop { match grpc_channel(addrs).await {