diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index 2fa84f45fe..596cb1728d 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -49,22 +49,19 @@ impl ChunkLifecycleAction { } /// A trait that encapsulates the database logic that is automated by `LifecyclePolicy` -/// -/// Note: This trait is meant to be implemented for references to types allowing them -/// to yield `LifecycleDb::Chunk` with non-static lifetimes pub trait LifecycleDb { type Chunk: LockableChunk; type Partition: LockablePartition; /// Return the in-memory size of the database. We expect this /// to change from call to call as chunks are dropped - fn buffer_size(self) -> usize; + fn buffer_size(&self) -> usize; /// Returns the lifecycle policy - fn rules(self) -> LifecycleRules; + fn rules(&self) -> LifecycleRules; /// Returns a list of lockable partitions in the database - fn partitions(self) -> Vec; + fn partitions(&self) -> Vec; } /// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 7f196d7942..7e33642551 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -22,22 +22,22 @@ pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); /// /// `LifecyclePolicy::check_for_work` can then be used to drive progress /// of the `LifecycleChunk` contained within this `LifecycleDb` -pub struct LifecyclePolicy<'a, M> +pub struct LifecyclePolicy where - &'a M: LifecycleDb, + M: LifecycleDb, { /// The `LifecycleDb` this policy is automating - db: &'a M, + db: M, /// Background tasks spawned by this `LifecyclePolicy` trackers: Vec>, } -impl<'a, M> LifecyclePolicy<'a, M> +impl LifecyclePolicy where - &'a M: LifecycleDb, + M: LifecycleDb, { - pub fn new(db: &'a M) -> Self { + pub fn new(db: M) -> Self { Self { db, trackers: vec![], @@ -423,9 +423,9 @@ where } } -impl<'a, M> Debug for LifecyclePolicy<'a, M> +impl Debug for LifecyclePolicy where - &'a M: LifecycleDb, + M: LifecycleDb + Copy, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "LifecyclePolicy{{..}}") @@ -813,7 +813,7 @@ mod tests { type Chunk = TestLockableChunk<'a>; type Partition = TestLockablePartition<'a>; - fn buffer_size(self) -> usize { + fn buffer_size(&self) -> usize { // All chunks are 20 bytes self.partitions .read() @@ -822,11 +822,11 @@ mod tests { .sum() } - fn rules(self) -> LifecycleRules { + fn rules(&self) -> LifecycleRules { self.rules.clone() } - fn partitions(self) -> Vec { + fn partitions(&self) -> Vec { self.partitions .read() .iter() diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs index 18e1249ae0..9d15c2aa02 100644 --- a/query_tests/src/influxrpc/field_columns.rs +++ b/query_tests/src/influxrpc/field_columns.rs @@ -32,7 +32,7 @@ macro_rules! run_field_columns_test_case { let executor = db.executor(); let plan = planner - .field_columns(&db, predicate.clone()) + .field_columns(db.as_ref(), predicate.clone()) .expect("built plan successfully"); let fields = executor .to_field_list(plan) @@ -132,7 +132,7 @@ async fn test_field_name_plan() { let planner = InfluxRpcPlanner::new(); let plan = planner - .field_columns(&db, predicate.clone()) + .field_columns(db.as_ref(), predicate.clone()) .expect("built plan successfully"); let mut plans = plan.plans; diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index 7c368e170e..cd370e25ec 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -50,7 +50,7 @@ macro_rules! run_read_filter_test_case { let planner = InfluxRpcPlanner::new(); let plan = planner - .read_filter(&db, predicate.clone()) + .read_filter(db.as_ref(), predicate.clone()) .expect("built plan successfully"); let string_results = run_series_set_plan(db.executor(), plan).await; diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index 2a60224a03..4a910a02fe 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -29,7 +29,7 @@ macro_rules! run_read_group_test_case { let planner = InfluxRpcPlanner::new(); let plans = planner - .read_group(&db, predicate.clone(), agg, &group_columns) + .read_group(db.as_ref(), predicate.clone(), agg, &group_columns) .expect("built plan successfully"); let plans = plans.into_inner(); diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index a66b6cc706..e467fbfe55 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -32,7 +32,13 @@ macro_rules! run_read_window_aggregate_test_case { let planner = InfluxRpcPlanner::new(); let plans = planner - .read_window_aggregate(&db, predicate.clone(), agg, every.clone(), offset.clone()) + .read_window_aggregate( + db.as_ref(), + predicate.clone(), + agg, + every.clone(), + offset.clone(), + ) .expect("built plan successfully"); let plans = plans.into_inner(); diff --git a/query_tests/src/influxrpc/table_names.rs b/query_tests/src/influxrpc/table_names.rs index 38fe95980b..b40388b1b1 100644 --- a/query_tests/src/influxrpc/table_names.rs +++ b/query_tests/src/influxrpc/table_names.rs @@ -22,7 +22,7 @@ macro_rules! run_table_names_test_case { let planner = InfluxRpcPlanner::new(); let plan = planner - .table_names(&db, predicate.clone()) + .table_names(db.as_ref(), predicate.clone()) .expect("built plan successfully"); let names = db .executor() diff --git a/query_tests/src/influxrpc/tag_keys.rs b/query_tests/src/influxrpc/tag_keys.rs index 040d83ba32..c60c3b371a 100644 --- a/query_tests/src/influxrpc/tag_keys.rs +++ b/query_tests/src/influxrpc/tag_keys.rs @@ -26,7 +26,7 @@ macro_rules! run_tag_keys_test_case { let planner = InfluxRpcPlanner::new(); let plan = planner - .tag_keys(&db, predicate.clone()) + .tag_keys(db.as_ref(), predicate.clone()) .expect("built plan successfully"); let names = db .executor() diff --git a/query_tests/src/influxrpc/tag_values.rs b/query_tests/src/influxrpc/tag_values.rs index 351c1e7803..024da26c11 100644 --- a/query_tests/src/influxrpc/tag_values.rs +++ b/query_tests/src/influxrpc/tag_values.rs @@ -24,7 +24,7 @@ macro_rules! run_tag_values_test_case { let planner = InfluxRpcPlanner::new(); let plan = planner - .tag_values(&db, &tag_name, predicate.clone()) + .tag_values(db.as_ref(), &tag_name, predicate.clone()) .expect("built plan successfully"); let names = db .executor() @@ -242,7 +242,7 @@ async fn list_tag_values_field_col() { // Test: temp is a field, not a tag let tag_name = "temp"; - let plan_result = planner.tag_values(&db, &tag_name, predicate.clone()); + let plan_result = planner.tag_values(db.as_ref(), &tag_name, predicate.clone()); assert_eq!( plan_result.unwrap_err().to_string(), diff --git a/query_tests/src/pruning.rs b/query_tests/src/pruning.rs index aae58e4618..3bc4ffeb97 100644 --- a/query_tests/src/pruning.rs +++ b/query_tests/src/pruning.rs @@ -58,7 +58,6 @@ async fn chunk_pruning_sql() { db, metric_registry, } = setup().await; - let db = Arc::new(db); let expected = vec![ "+-----+-------------------------------+", @@ -114,7 +113,6 @@ async fn chunk_pruning_influxrpc() { db, metric_registry, } = setup().await; - let db = Arc::new(db); let predicate = PredicateBuilder::new() // bar < 3.0 diff --git a/query_tests/src/runner.rs b/query_tests/src/runner.rs index 0e18bc27b2..5e87754a8f 100644 --- a/query_tests/src/runner.rs +++ b/query_tests/src/runner.rs @@ -10,7 +10,6 @@ use std::{ io::BufWriter, io::Write, path::{Path, PathBuf}, - sync::Arc, }; use self::{parse::TestQueries, setup::TestSetup}; @@ -239,7 +238,6 @@ impl Runner { let DbScenario { scenario_name, db, .. } = scenario; - let db = Arc::new(db); writeln!(self.log, "Running scenario '{}'", scenario_name)?; writeln!(self.log, "SQL: '{:#?}'", sql)?; diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 439885b619..0429172b2e 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -19,7 +19,7 @@ use server::{db::test_helpers::write_lp, Db}; #[derive(Debug)] pub struct DbScenario { pub scenario_name: String, - pub db: Db, + pub db: Arc, } #[async_trait] @@ -853,7 +853,7 @@ pub async fn make_two_chunk_scenarios( } /// Rollover the mutable buffer and load chunk 0 to the read buffer and object store -pub async fn rollover_and_load(db: &Db, partition_key: &str, table_name: &str) { +pub async fn rollover_and_load(db: &Arc, partition_key: &str, table_name: &str) { db.rollover_partition(table_name, partition_key) .await .unwrap(); diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 332653adb0..b603af66fc 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -7,7 +7,6 @@ use super::scenarios::*; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_sorted_eq; use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner}; -use std::sync::Arc; /// runs table_names(predicate) and compares it to the expected /// output @@ -19,7 +18,6 @@ macro_rules! run_sql_test_case { let DbScenario { scenario_name, db, .. } = scenario; - let db = Arc::new(db); println!("Running scenario '{}'", scenario_name); println!("SQL: '{:#?}'", sql); diff --git a/server/src/db.rs b/server/src/db.rs index c0ce21d685..18adfbf2ab 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2,6 +2,7 @@ //! instances of the mutable buffer, read buffer, and object store pub(crate) use crate::db::chunk::DbChunk; +use crate::db::lifecycle::ArcDb; use crate::{ db::{ access::QueryCatalogAccess, @@ -331,13 +332,16 @@ impl Db { } pub fn lockable_chunk( - &self, + self: &Arc, table_name: &str, partition_key: &str, chunk_id: u32, - ) -> catalog::Result> { + ) -> catalog::Result { let chunk = self.chunk(table_name, partition_key, chunk_id)?; - Ok(LockableCatalogChunk { db: self, chunk }) + Ok(LockableCatalogChunk { + db: Arc::clone(self), + chunk, + }) } /// Drops the specified chunk from the catalog and all storage systems @@ -359,7 +363,7 @@ impl Db { /// /// Returns a handle to the newly loaded chunk in the read buffer pub async fn move_chunk_to_read_buffer( - &self, + self: &Arc, table_name: &str, partition_key: &str, chunk_id: u32, @@ -380,7 +384,7 @@ impl Db { /// /// Returns a handle to the newly created chunk in the read buffer pub async fn compact_partition( - &self, + self: &Arc, table_name: &str, partition_key: &str, ) -> Result> { @@ -389,7 +393,7 @@ impl Db { let fut = { let partition = self.partition(table_name, partition_key)?; let partition = LockableCatalogPartition { - db: self, + db: Arc::clone(&self), partition, }; @@ -412,7 +416,7 @@ impl Db { /// Write given table of a given chunk to object store. /// The writing only happen if that chunk already in read buffer pub async fn write_chunk_to_object_store( - &self, + self: &Arc, table_name: &str, partition_key: &str, chunk_id: u32, @@ -425,7 +429,7 @@ impl Db { /// Unload chunk from read buffer but keep it in object store pub fn unload_read_buffer( - &self, + self: &Arc, table_name: &str, partition_key: &str, chunk_id: u32, @@ -493,7 +497,7 @@ impl Db { tokio::join!( // lifecycle policy loop async { - let mut policy = ::lifecycle::LifecyclePolicy::new(&self); + let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(&self))); while !shutdown.is_cancelled() { self.worker_iterations_lifecycle @@ -911,18 +915,17 @@ mod tests { // Writes should be forwarded to the write buffer *and* the mutable buffer if both are // configured. let write_buffer = Arc::new(MockBuffer::default()); - let test_db = TestDb::builder() + let db = TestDb::builder() .write_buffer(Arc::clone(&write_buffer) as _) .build() .await .db; let entry = lp_to_entry("cpu bar=1 10"); - test_db.store_entry(entry).await.unwrap(); + db.store_entry(entry).await.unwrap(); assert_eq!(write_buffer.entries.lock().unwrap().len(), 1); - let db = Arc::new(test_db); let batches = run_query(db, "select * from cpu").await; let expected = vec![ @@ -938,7 +941,7 @@ mod tests { #[tokio::test] async fn read_write() { // This test also exercises the path without a write buffer. - let db = Arc::new(make_db().await.db); + let db = make_db().await.db; write_lp(&db, "cpu bar=1 10").await; let batches = run_query(db, "select * from cpu").await; @@ -955,7 +958,7 @@ mod tests { #[tokio::test] async fn try_all_partition_writes_when_some_fail() { - let db = Arc::new(make_db().await.db); + let db = make_db().await.db; let nanoseconds_per_hour = 60 * 60 * 1_000_000_000u64; @@ -1032,7 +1035,7 @@ mod tests { #[tokio::test] async fn metrics_during_rollover() { let test_db = make_db().await; - let db = Arc::new(test_db.db); + let db = test_db.db; write_lp(db.as_ref(), "cpu bar=1 10").await; @@ -1172,7 +1175,7 @@ mod tests { #[tokio::test] async fn write_with_rollover() { - let db = Arc::new(make_db().await.db); + let db = make_db().await.db; write_lp(db.as_ref(), "cpu bar=1 10").await; assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap()); @@ -2511,7 +2514,7 @@ mod tests { // replay) let mut chunks = vec![]; for _ in 0..2 { - chunks.push(create_parquet_chunk(db.as_ref()).await); + chunks.push(create_parquet_chunk(&db).await); } // ==================== check: catalog state ==================== @@ -2594,7 +2597,7 @@ mod tests { // 2: dropped (not in current catalog but parquet file still present for time travel) let mut paths_keep = vec![]; for i in 0..3i8 { - let (table_name, partition_key, chunk_id) = create_parquet_chunk(db.as_ref()).await; + let (table_name, partition_key, chunk_id) = create_parquet_chunk(&db).await; let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap(); let chunk = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { @@ -2686,7 +2689,7 @@ mod tests { // replay) let mut chunks = vec![]; for _ in 0..2 { - chunks.push(create_parquet_chunk(db.as_ref()).await); + chunks.push(create_parquet_chunk(&db).await); } // ==================== do: remove .txn files ==================== @@ -2739,7 +2742,7 @@ mod tests { write_lp(db.as_ref(), "cpu bar=1 10").await; } - async fn create_parquet_chunk(db: &Db) -> (String, String, u32) { + async fn create_parquet_chunk(db: &Arc) -> (String, String, u32) { write_lp(db, "cpu bar=1 10").await; let partition_key = "1970-01-01T00"; let table_name = "cpu"; diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index a6f463d694..f1f92fd229 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -34,6 +34,18 @@ mod move_chunk; mod unload; mod write; +/// A newtype wrapper around `Arc` to workaround trait orphan rules +#[derive(Debug, Clone)] +pub struct ArcDb(pub(super) Arc); + +impl std::ops::Deref for ArcDb { + type Target = Db; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// /// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db` /// @@ -42,12 +54,12 @@ mod write; /// without allowing concurrent modification /// #[derive(Debug, Clone)] -pub struct LockableCatalogChunk<'a> { - pub db: &'a Db, +pub struct LockableCatalogChunk { + pub db: Arc, pub chunk: Arc>, } -impl<'a> LockableChunk for LockableCatalogChunk<'a> { +impl LockableChunk for LockableCatalogChunk { type Chunk = CatalogChunk; type Job = Job; @@ -98,15 +110,15 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> { /// without allowing concurrent modification /// #[derive(Debug, Clone)] -pub struct LockableCatalogPartition<'a> { - pub db: &'a Db, +pub struct LockableCatalogPartition { + pub db: Arc, pub partition: Arc>, } -impl<'a> LockablePartition for LockableCatalogPartition<'a> { +impl LockablePartition for LockableCatalogPartition { type Partition = Partition; - type Chunk = LockableCatalogChunk<'a>; + type Chunk = LockableCatalogChunk; type Error = super::lifecycle::Error; @@ -123,19 +135,18 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> { chunk_id: u32, ) -> Option { s.chunk(chunk_id).map(|chunk| LockableCatalogChunk { - db: s.data().db, + db: Arc::clone(&s.data().db), chunk: Arc::clone(chunk), }) } fn chunks(s: &LifecycleReadGuard<'_, Self::Partition, Self>) -> Vec<(u32, Self::Chunk)> { - let db = s.data().db; s.keyed_chunks() .map(|(id, chunk)| { ( id, LockableCatalogChunk { - db, + db: Arc::clone(&s.data().db), chunk: Arc::clone(chunk), }, ) @@ -162,24 +173,24 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> { } } -impl<'a> LifecycleDb for &'a Db { - type Chunk = LockableCatalogChunk<'a>; - type Partition = LockableCatalogPartition<'a>; +impl LifecycleDb for ArcDb { + type Chunk = LockableCatalogChunk; + type Partition = LockableCatalogPartition; - fn buffer_size(self) -> usize { + fn buffer_size(&self) -> usize { self.catalog.metrics().memory().total() } - fn rules(self) -> LifecycleRules { + fn rules(&self) -> LifecycleRules { self.rules.read().lifecycle_rules.clone() } - fn partitions(self) -> Vec { + fn partitions(&self) -> Vec { self.catalog .partitions() .into_iter() .map(|partition| LockableCatalogPartition { - db: self, + db: Arc::clone(&self.0), partition, }) .collect() diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index 06e0b5fb4d..669591613e 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -25,13 +25,13 @@ use super::{error::Result, LockableCatalogChunk, LockableCatalogPartition}; /// /// TODO: Replace low-level locks with transaction object pub(crate) fn compact_chunks( - partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition<'_>>, - chunks: Vec>>, + partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>, + chunks: Vec>, ) -> Result<( TaskTracker, TrackedFuture>> + Send>, )> { - let db = partition.data().db; + let db = Arc::clone(&partition.data().db); let table_name = partition.table_name().to_string(); let chunk_ids: Vec<_> = chunks.iter().map(|x| x.id()).collect(); @@ -47,7 +47,7 @@ pub(crate) fn compact_chunks( .into_iter() .map(|mut chunk| { // Sanity-check - assert!(std::ptr::eq(db, chunk.data().db)); + assert!(Arc::ptr_eq(&db, &chunk.data().db)); assert_eq!(chunk.table_name().as_ref(), table_name.as_str()); chunk.set_compacting(®istration)?; diff --git a/server/src/db/lifecycle/move_chunk.rs b/server/src/db/lifecycle/move_chunk.rs index 4101779fc4..14e0839baa 100644 --- a/server/src/db/lifecycle/move_chunk.rs +++ b/server/src/db/lifecycle/move_chunk.rs @@ -17,12 +17,12 @@ use super::{error::Result, LockableCatalogChunk}; /// Returns a future registered with the tracker registry, and the corresponding tracker /// The caller can either spawn this future to tokio, or block directly on it pub fn move_chunk_to_read_buffer( - mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>, + mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>, ) -> Result<( TaskTracker, TrackedFuture>> + Send>, )> { - let db = guard.data().db; + let db = Arc::clone(&guard.data().db); let addr = guard.addr().clone(); // TODO: Use ChunkAddr within Job let (tracker, registration) = db.jobs.register(Job::CloseChunk { diff --git a/server/src/db/lifecycle/unload.rs b/server/src/db/lifecycle/unload.rs index 474b479346..f2b21d8472 100644 --- a/server/src/db/lifecycle/unload.rs +++ b/server/src/db/lifecycle/unload.rs @@ -12,7 +12,7 @@ use super::LockableCatalogChunk; use super::error::Result; pub fn unload_read_buffer_chunk( - mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>, + mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>, ) -> Result> { debug!(chunk=%chunk.addr(), "unloading chunk from read buffer"); diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index f687ad9528..c43e03be24 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -33,12 +33,12 @@ use super::error::{ /// Returns a future registered with the tracker registry, and the corresponding tracker /// The caller can either spawn this future to tokio, or block directly on it pub fn write_chunk_to_object_store( - mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>, + mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>, ) -> Result<( TaskTracker, TrackedFuture>> + Send>, )> { - let db = guard.data().db; + let db = Arc::clone(&guard.data().db); let addr = guard.addr().clone(); // TODO: Use ChunkAddr within Job @@ -64,11 +64,6 @@ pub fn write_chunk_to_object_store( .catalog_transactions_until_checkpoint .get(); - let preserved_catalog = Arc::clone(&db.preserved_catalog); - let catalog = Arc::clone(&db.catalog); - let object_store = Arc::clone(&db.store); - let cleanup_lock = Arc::clone(&db.cleanup_lock); - // Drop locks let chunk = guard.unwrap().chunk; @@ -106,7 +101,7 @@ pub fn write_chunk_to_object_store( // catalog-level transaction for preservation layer { // fetch shared (= read) guard preventing the cleanup job from deleting our files - let _guard = cleanup_lock.read().await; + let _guard = db.cleanup_lock.read().await; // Write this table data into the object store // @@ -124,14 +119,16 @@ pub fn write_chunk_to_object_store( .context(WritingToObjectStore)?; let parquet_metadata = Arc::new(parquet_metadata); - let metrics = catalog + let metrics = db + .catalog .metrics_registry - .register_domain_with_labels("parquet", catalog.metric_labels.clone()); - let metrics = ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet()); + .register_domain_with_labels("parquet", db.catalog.metric_labels.clone()); + let metrics = + ParquetChunkMetrics::new(&metrics, db.catalog.metrics().memory().parquet()); let parquet_chunk = Arc::new( ParquetChunk::new( path.clone(), - object_store, + Arc::clone(&db.store), Arc::clone(&parquet_metadata), metrics, ) @@ -144,7 +141,7 @@ pub fn write_chunk_to_object_store( // transaction lock (that is part of the PreservedCatalog) for too long. By using the // cleanup lock (see above) it is ensured that the file that we have written is not deleted // in between. - let mut transaction = preserved_catalog.open_transaction().await; + let mut transaction = db.preserved_catalog.open_transaction().await; transaction .add_parquet(&path, &parquet_metadata) .context(TransactionError)?; @@ -169,7 +166,7 @@ pub fn write_chunk_to_object_store( // transaction lock. Therefore we don't need to worry about concurrent modifications of // preserved chunks. if let Err(e) = ckpt_handle - .create_checkpoint(checkpoint_data_from_catalog(&catalog)) + .create_checkpoint(checkpoint_data_from_catalog(&db.catalog)) .await { warn!(%e, "cannot create catalog checkpoint"); diff --git a/server/src/utils.rs b/server/src/utils.rs index bff735deef..277658949a 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -19,7 +19,7 @@ use crate::{ // of a Db and its metrics. #[derive(Debug)] pub struct TestDb { - pub db: Db, + pub db: Arc, pub metric_registry: metrics::TestMetricRegistry, } @@ -92,7 +92,7 @@ impl TestDbBuilder { TestDb { metric_registry: metrics::TestMetricRegistry::new(metrics_registry), - db: Db::new(database_to_commit, Arc::new(JobRegistry::new())), + db: Arc::new(Db::new(database_to_commit, Arc::new(JobRegistry::new()))), } }