refactor: use Arc<Db> in lifecycle actions (#1873)

* refactor: use Arc<Db> in lifecycle actions

* chore: review feedback
pull/24376/head
Raphael Taylor-Davies 2021-07-01 20:56:33 +01:00 committed by GitHub
parent 56c8c8d428
commit 5b00bc69e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 102 additions and 94 deletions

View File

@ -49,22 +49,19 @@ impl ChunkLifecycleAction {
}
/// A trait that encapsulates the database logic that is automated by `LifecyclePolicy`
///
/// Note: This trait is meant to be implemented for references to types allowing them
/// to yield `LifecycleDb::Chunk` with non-static lifetimes
pub trait LifecycleDb {
type Chunk: LockableChunk;
type Partition: LockablePartition;
/// Return the in-memory size of the database. We expect this
/// to change from call to call as chunks are dropped
fn buffer_size(self) -> usize;
fn buffer_size(&self) -> usize;
/// Returns the lifecycle policy
fn rules(self) -> LifecycleRules;
fn rules(&self) -> LifecycleRules;
/// Returns a list of lockable partitions in the database
fn partitions(self) -> Vec<Self::Partition>;
fn partitions(&self) -> Vec<Self::Partition>;
}
/// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows

View File

@ -22,22 +22,22 @@ pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
///
/// `LifecyclePolicy::check_for_work` can then be used to drive progress
/// of the `LifecycleChunk` contained within this `LifecycleDb`
pub struct LifecyclePolicy<'a, M>
pub struct LifecyclePolicy<M>
where
&'a M: LifecycleDb,
M: LifecycleDb,
{
/// The `LifecycleDb` this policy is automating
db: &'a M,
db: M,
/// Background tasks spawned by this `LifecyclePolicy`
trackers: Vec<TaskTracker<ChunkLifecycleAction>>,
}
impl<'a, M> LifecyclePolicy<'a, M>
impl<M> LifecyclePolicy<M>
where
&'a M: LifecycleDb,
M: LifecycleDb,
{
pub fn new(db: &'a M) -> Self {
pub fn new(db: M) -> Self {
Self {
db,
trackers: vec![],
@ -423,9 +423,9 @@ where
}
}
impl<'a, M> Debug for LifecyclePolicy<'a, M>
impl<M> Debug for LifecyclePolicy<M>
where
&'a M: LifecycleDb,
M: LifecycleDb + Copy,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LifecyclePolicy{{..}}")
@ -813,7 +813,7 @@ mod tests {
type Chunk = TestLockableChunk<'a>;
type Partition = TestLockablePartition<'a>;
fn buffer_size(self) -> usize {
fn buffer_size(&self) -> usize {
// All chunks are 20 bytes
self.partitions
.read()
@ -822,11 +822,11 @@ mod tests {
.sum()
}
fn rules(self) -> LifecycleRules {
fn rules(&self) -> LifecycleRules {
self.rules.clone()
}
fn partitions(self) -> Vec<Self::Partition> {
fn partitions(&self) -> Vec<Self::Partition> {
self.partitions
.read()
.iter()

View File

@ -32,7 +32,7 @@ macro_rules! run_field_columns_test_case {
let executor = db.executor();
let plan = planner
.field_columns(&db, predicate.clone())
.field_columns(db.as_ref(), predicate.clone())
.expect("built plan successfully");
let fields = executor
.to_field_list(plan)
@ -132,7 +132,7 @@ async fn test_field_name_plan() {
let planner = InfluxRpcPlanner::new();
let plan = planner
.field_columns(&db, predicate.clone())
.field_columns(db.as_ref(), predicate.clone())
.expect("built plan successfully");
let mut plans = plan.plans;

View File

@ -50,7 +50,7 @@ macro_rules! run_read_filter_test_case {
let planner = InfluxRpcPlanner::new();
let plan = planner
.read_filter(&db, predicate.clone())
.read_filter(db.as_ref(), predicate.clone())
.expect("built plan successfully");
let string_results = run_series_set_plan(db.executor(), plan).await;

View File

@ -29,7 +29,7 @@ macro_rules! run_read_group_test_case {
let planner = InfluxRpcPlanner::new();
let plans = planner
.read_group(&db, predicate.clone(), agg, &group_columns)
.read_group(db.as_ref(), predicate.clone(), agg, &group_columns)
.expect("built plan successfully");
let plans = plans.into_inner();

View File

@ -32,7 +32,13 @@ macro_rules! run_read_window_aggregate_test_case {
let planner = InfluxRpcPlanner::new();
let plans = planner
.read_window_aggregate(&db, predicate.clone(), agg, every.clone(), offset.clone())
.read_window_aggregate(
db.as_ref(),
predicate.clone(),
agg,
every.clone(),
offset.clone(),
)
.expect("built plan successfully");
let plans = plans.into_inner();

View File

@ -22,7 +22,7 @@ macro_rules! run_table_names_test_case {
let planner = InfluxRpcPlanner::new();
let plan = planner
.table_names(&db, predicate.clone())
.table_names(db.as_ref(), predicate.clone())
.expect("built plan successfully");
let names = db
.executor()

View File

@ -26,7 +26,7 @@ macro_rules! run_tag_keys_test_case {
let planner = InfluxRpcPlanner::new();
let plan = planner
.tag_keys(&db, predicate.clone())
.tag_keys(db.as_ref(), predicate.clone())
.expect("built plan successfully");
let names = db
.executor()

View File

@ -24,7 +24,7 @@ macro_rules! run_tag_values_test_case {
let planner = InfluxRpcPlanner::new();
let plan = planner
.tag_values(&db, &tag_name, predicate.clone())
.tag_values(db.as_ref(), &tag_name, predicate.clone())
.expect("built plan successfully");
let names = db
.executor()
@ -242,7 +242,7 @@ async fn list_tag_values_field_col() {
// Test: temp is a field, not a tag
let tag_name = "temp";
let plan_result = planner.tag_values(&db, &tag_name, predicate.clone());
let plan_result = planner.tag_values(db.as_ref(), &tag_name, predicate.clone());
assert_eq!(
plan_result.unwrap_err().to_string(),

View File

@ -58,7 +58,6 @@ async fn chunk_pruning_sql() {
db,
metric_registry,
} = setup().await;
let db = Arc::new(db);
let expected = vec![
"+-----+-------------------------------+",
@ -114,7 +113,6 @@ async fn chunk_pruning_influxrpc() {
db,
metric_registry,
} = setup().await;
let db = Arc::new(db);
let predicate = PredicateBuilder::new()
// bar < 3.0

View File

@ -10,7 +10,6 @@ use std::{
io::BufWriter,
io::Write,
path::{Path, PathBuf},
sync::Arc,
};
use self::{parse::TestQueries, setup::TestSetup};
@ -239,7 +238,6 @@ impl<W: Write> Runner<W> {
let DbScenario {
scenario_name, db, ..
} = scenario;
let db = Arc::new(db);
writeln!(self.log, "Running scenario '{}'", scenario_name)?;
writeln!(self.log, "SQL: '{:#?}'", sql)?;

View File

@ -19,7 +19,7 @@ use server::{db::test_helpers::write_lp, Db};
#[derive(Debug)]
pub struct DbScenario {
pub scenario_name: String,
pub db: Db,
pub db: Arc<Db>,
}
#[async_trait]
@ -853,7 +853,7 @@ pub async fn make_two_chunk_scenarios(
}
/// Rollover the mutable buffer and load chunk 0 to the read buffer and object store
pub async fn rollover_and_load(db: &Db, partition_key: &str, table_name: &str) {
pub async fn rollover_and_load(db: &Arc<Db>, partition_key: &str, table_name: &str) {
db.rollover_partition(table_name, partition_key)
.await
.unwrap();

View File

@ -7,7 +7,6 @@ use super::scenarios::*;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner};
use std::sync::Arc;
/// runs table_names(predicate) and compares it to the expected
/// output
@ -19,7 +18,6 @@ macro_rules! run_sql_test_case {
let DbScenario {
scenario_name, db, ..
} = scenario;
let db = Arc::new(db);
println!("Running scenario '{}'", scenario_name);
println!("SQL: '{:#?}'", sql);

View File

@ -2,6 +2,7 @@
//! instances of the mutable buffer, read buffer, and object store
pub(crate) use crate::db::chunk::DbChunk;
use crate::db::lifecycle::ArcDb;
use crate::{
db::{
access::QueryCatalogAccess,
@ -331,13 +332,16 @@ impl Db {
}
pub fn lockable_chunk(
&self,
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
chunk_id: u32,
) -> catalog::Result<LockableCatalogChunk<'_>> {
) -> catalog::Result<LockableCatalogChunk> {
let chunk = self.chunk(table_name, partition_key, chunk_id)?;
Ok(LockableCatalogChunk { db: self, chunk })
Ok(LockableCatalogChunk {
db: Arc::clone(self),
chunk,
})
}
/// Drops the specified chunk from the catalog and all storage systems
@ -359,7 +363,7 @@ impl Db {
///
/// Returns a handle to the newly loaded chunk in the read buffer
pub async fn move_chunk_to_read_buffer(
&self,
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
chunk_id: u32,
@ -380,7 +384,7 @@ impl Db {
///
/// Returns a handle to the newly created chunk in the read buffer
pub async fn compact_partition(
&self,
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
) -> Result<Arc<DbChunk>> {
@ -389,7 +393,7 @@ impl Db {
let fut = {
let partition = self.partition(table_name, partition_key)?;
let partition = LockableCatalogPartition {
db: self,
db: Arc::clone(&self),
partition,
};
@ -412,7 +416,7 @@ impl Db {
/// Write given table of a given chunk to object store.
/// The writing only happen if that chunk already in read buffer
pub async fn write_chunk_to_object_store(
&self,
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
chunk_id: u32,
@ -425,7 +429,7 @@ impl Db {
/// Unload chunk from read buffer but keep it in object store
pub fn unload_read_buffer(
&self,
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
chunk_id: u32,
@ -493,7 +497,7 @@ impl Db {
tokio::join!(
// lifecycle policy loop
async {
let mut policy = ::lifecycle::LifecyclePolicy::new(&self);
let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(&self)));
while !shutdown.is_cancelled() {
self.worker_iterations_lifecycle
@ -911,18 +915,17 @@ mod tests {
// Writes should be forwarded to the write buffer *and* the mutable buffer if both are
// configured.
let write_buffer = Arc::new(MockBuffer::default());
let test_db = TestDb::builder()
let db = TestDb::builder()
.write_buffer(Arc::clone(&write_buffer) as _)
.build()
.await
.db;
let entry = lp_to_entry("cpu bar=1 10");
test_db.store_entry(entry).await.unwrap();
db.store_entry(entry).await.unwrap();
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1);
let db = Arc::new(test_db);
let batches = run_query(db, "select * from cpu").await;
let expected = vec![
@ -938,7 +941,7 @@ mod tests {
#[tokio::test]
async fn read_write() {
// This test also exercises the path without a write buffer.
let db = Arc::new(make_db().await.db);
let db = make_db().await.db;
write_lp(&db, "cpu bar=1 10").await;
let batches = run_query(db, "select * from cpu").await;
@ -955,7 +958,7 @@ mod tests {
#[tokio::test]
async fn try_all_partition_writes_when_some_fail() {
let db = Arc::new(make_db().await.db);
let db = make_db().await.db;
let nanoseconds_per_hour = 60 * 60 * 1_000_000_000u64;
@ -1032,7 +1035,7 @@ mod tests {
#[tokio::test]
async fn metrics_during_rollover() {
let test_db = make_db().await;
let db = Arc::new(test_db.db);
let db = test_db.db;
write_lp(db.as_ref(), "cpu bar=1 10").await;
@ -1172,7 +1175,7 @@ mod tests {
#[tokio::test]
async fn write_with_rollover() {
let db = Arc::new(make_db().await.db);
let db = make_db().await.db;
write_lp(db.as_ref(), "cpu bar=1 10").await;
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
@ -2511,7 +2514,7 @@ mod tests {
// replay)
let mut chunks = vec![];
for _ in 0..2 {
chunks.push(create_parquet_chunk(db.as_ref()).await);
chunks.push(create_parquet_chunk(&db).await);
}
// ==================== check: catalog state ====================
@ -2594,7 +2597,7 @@ mod tests {
// 2: dropped (not in current catalog but parquet file still present for time travel)
let mut paths_keep = vec![];
for i in 0..3i8 {
let (table_name, partition_key, chunk_id) = create_parquet_chunk(db.as_ref()).await;
let (table_name, partition_key, chunk_id) = create_parquet_chunk(&db).await;
let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap();
let chunk = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
@ -2686,7 +2689,7 @@ mod tests {
// replay)
let mut chunks = vec![];
for _ in 0..2 {
chunks.push(create_parquet_chunk(db.as_ref()).await);
chunks.push(create_parquet_chunk(&db).await);
}
// ==================== do: remove .txn files ====================
@ -2739,7 +2742,7 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=1 10").await;
}
async fn create_parquet_chunk(db: &Db) -> (String, String, u32) {
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, u32) {
write_lp(db, "cpu bar=1 10").await;
let partition_key = "1970-01-01T00";
let table_name = "cpu";

View File

@ -34,6 +34,18 @@ mod move_chunk;
mod unload;
mod write;
/// A newtype wrapper around `Arc<Db>` to workaround trait orphan rules
#[derive(Debug, Clone)]
pub struct ArcDb(pub(super) Arc<Db>);
impl std::ops::Deref for ArcDb {
type Target = Db;
fn deref(&self) -> &Self::Target {
&self.0
}
}
///
/// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db`
///
@ -42,12 +54,12 @@ mod write;
/// without allowing concurrent modification
///
#[derive(Debug, Clone)]
pub struct LockableCatalogChunk<'a> {
pub db: &'a Db,
pub struct LockableCatalogChunk {
pub db: Arc<Db>,
pub chunk: Arc<RwLock<CatalogChunk>>,
}
impl<'a> LockableChunk for LockableCatalogChunk<'a> {
impl LockableChunk for LockableCatalogChunk {
type Chunk = CatalogChunk;
type Job = Job;
@ -98,15 +110,15 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
/// without allowing concurrent modification
///
#[derive(Debug, Clone)]
pub struct LockableCatalogPartition<'a> {
pub db: &'a Db,
pub struct LockableCatalogPartition {
pub db: Arc<Db>,
pub partition: Arc<RwLock<Partition>>,
}
impl<'a> LockablePartition for LockableCatalogPartition<'a> {
impl LockablePartition for LockableCatalogPartition {
type Partition = Partition;
type Chunk = LockableCatalogChunk<'a>;
type Chunk = LockableCatalogChunk;
type Error = super::lifecycle::Error;
@ -123,19 +135,18 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> {
chunk_id: u32,
) -> Option<Self::Chunk> {
s.chunk(chunk_id).map(|chunk| LockableCatalogChunk {
db: s.data().db,
db: Arc::clone(&s.data().db),
chunk: Arc::clone(chunk),
})
}
fn chunks(s: &LifecycleReadGuard<'_, Self::Partition, Self>) -> Vec<(u32, Self::Chunk)> {
let db = s.data().db;
s.keyed_chunks()
.map(|(id, chunk)| {
(
id,
LockableCatalogChunk {
db,
db: Arc::clone(&s.data().db),
chunk: Arc::clone(chunk),
},
)
@ -162,24 +173,24 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> {
}
}
impl<'a> LifecycleDb for &'a Db {
type Chunk = LockableCatalogChunk<'a>;
type Partition = LockableCatalogPartition<'a>;
impl LifecycleDb for ArcDb {
type Chunk = LockableCatalogChunk;
type Partition = LockableCatalogPartition;
fn buffer_size(self) -> usize {
fn buffer_size(&self) -> usize {
self.catalog.metrics().memory().total()
}
fn rules(self) -> LifecycleRules {
fn rules(&self) -> LifecycleRules {
self.rules.read().lifecycle_rules.clone()
}
fn partitions(self) -> Vec<Self::Partition> {
fn partitions(&self) -> Vec<Self::Partition> {
self.catalog
.partitions()
.into_iter()
.map(|partition| LockableCatalogPartition {
db: self,
db: Arc::clone(&self.0),
partition,
})
.collect()

View File

@ -25,13 +25,13 @@ use super::{error::Result, LockableCatalogChunk, LockableCatalogPartition};
///
/// TODO: Replace low-level locks with transaction object
pub(crate) fn compact_chunks(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition<'_>>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>>,
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = partition.data().db;
let db = Arc::clone(&partition.data().db);
let table_name = partition.table_name().to_string();
let chunk_ids: Vec<_> = chunks.iter().map(|x| x.id()).collect();
@ -47,7 +47,7 @@ pub(crate) fn compact_chunks(
.into_iter()
.map(|mut chunk| {
// Sanity-check
assert!(std::ptr::eq(db, chunk.data().db));
assert!(Arc::ptr_eq(&db, &chunk.data().db));
assert_eq!(chunk.table_name().as_ref(), table_name.as_str());
chunk.set_compacting(&registration)?;

View File

@ -17,12 +17,12 @@ use super::{error::Result, LockableCatalogChunk};
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
pub fn move_chunk_to_read_buffer(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let db = Arc::clone(&guard.data().db);
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
let (tracker, registration) = db.jobs.register(Job::CloseChunk {

View File

@ -12,7 +12,7 @@ use super::LockableCatalogChunk;
use super::error::Result;
pub fn unload_read_buffer_chunk(
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>,
) -> Result<Arc<DbChunk>> {
debug!(chunk=%chunk.addr(), "unloading chunk from read buffer");

View File

@ -33,12 +33,12 @@ use super::error::{
/// Returns a future registered with the tracker registry, and the corresponding tracker
/// The caller can either spawn this future to tokio, or block directly on it
pub fn write_chunk_to_object_store(
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)> {
let db = guard.data().db;
let db = Arc::clone(&guard.data().db);
let addr = guard.addr().clone();
// TODO: Use ChunkAddr within Job
@ -64,11 +64,6 @@ pub fn write_chunk_to_object_store(
.catalog_transactions_until_checkpoint
.get();
let preserved_catalog = Arc::clone(&db.preserved_catalog);
let catalog = Arc::clone(&db.catalog);
let object_store = Arc::clone(&db.store);
let cleanup_lock = Arc::clone(&db.cleanup_lock);
// Drop locks
let chunk = guard.unwrap().chunk;
@ -106,7 +101,7 @@ pub fn write_chunk_to_object_store(
// catalog-level transaction for preservation layer
{
// fetch shared (= read) guard preventing the cleanup job from deleting our files
let _guard = cleanup_lock.read().await;
let _guard = db.cleanup_lock.read().await;
// Write this table data into the object store
//
@ -124,14 +119,16 @@ pub fn write_chunk_to_object_store(
.context(WritingToObjectStore)?;
let parquet_metadata = Arc::new(parquet_metadata);
let metrics = catalog
let metrics = db
.catalog
.metrics_registry
.register_domain_with_labels("parquet", catalog.metric_labels.clone());
let metrics = ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet());
.register_domain_with_labels("parquet", db.catalog.metric_labels.clone());
let metrics =
ParquetChunkMetrics::new(&metrics, db.catalog.metrics().memory().parquet());
let parquet_chunk = Arc::new(
ParquetChunk::new(
path.clone(),
object_store,
Arc::clone(&db.store),
Arc::clone(&parquet_metadata),
metrics,
)
@ -144,7 +141,7 @@ pub fn write_chunk_to_object_store(
// transaction lock (that is part of the PreservedCatalog) for too long. By using the
// cleanup lock (see above) it is ensured that the file that we have written is not deleted
// in between.
let mut transaction = preserved_catalog.open_transaction().await;
let mut transaction = db.preserved_catalog.open_transaction().await;
transaction
.add_parquet(&path, &parquet_metadata)
.context(TransactionError)?;
@ -169,7 +166,7 @@ pub fn write_chunk_to_object_store(
// transaction lock. Therefore we don't need to worry about concurrent modifications of
// preserved chunks.
if let Err(e) = ckpt_handle
.create_checkpoint(checkpoint_data_from_catalog(&catalog))
.create_checkpoint(checkpoint_data_from_catalog(&db.catalog))
.await
{
warn!(%e, "cannot create catalog checkpoint");

View File

@ -19,7 +19,7 @@ use crate::{
// of a Db and its metrics.
#[derive(Debug)]
pub struct TestDb {
pub db: Db,
pub db: Arc<Db>,
pub metric_registry: metrics::TestMetricRegistry,
}
@ -92,7 +92,7 @@ impl TestDbBuilder {
TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Db::new(database_to_commit, Arc::new(JobRegistry::new())),
db: Arc::new(Db::new(database_to_commit, Arc::new(JobRegistry::new()))),
}
}