Merge branch 'main' into pd-remove-mb-size-limit-checks
commit
404da38d6f
|
@ -49,22 +49,19 @@ impl ChunkLifecycleAction {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A trait that encapsulates the database logic that is automated by `LifecyclePolicy`
|
/// A trait that encapsulates the database logic that is automated by `LifecyclePolicy`
|
||||||
///
|
|
||||||
/// Note: This trait is meant to be implemented for references to types allowing them
|
|
||||||
/// to yield `LifecycleDb::Chunk` with non-static lifetimes
|
|
||||||
pub trait LifecycleDb {
|
pub trait LifecycleDb {
|
||||||
type Chunk: LockableChunk;
|
type Chunk: LockableChunk;
|
||||||
type Partition: LockablePartition;
|
type Partition: LockablePartition;
|
||||||
|
|
||||||
/// Return the in-memory size of the database. We expect this
|
/// Return the in-memory size of the database. We expect this
|
||||||
/// to change from call to call as chunks are dropped
|
/// to change from call to call as chunks are dropped
|
||||||
fn buffer_size(self) -> usize;
|
fn buffer_size(&self) -> usize;
|
||||||
|
|
||||||
/// Returns the lifecycle policy
|
/// Returns the lifecycle policy
|
||||||
fn rules(self) -> LifecycleRules;
|
fn rules(&self) -> LifecycleRules;
|
||||||
|
|
||||||
/// Returns a list of lockable partitions in the database
|
/// Returns a list of lockable partitions in the database
|
||||||
fn partitions(self) -> Vec<Self::Partition>;
|
fn partitions(&self) -> Vec<Self::Partition>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows
|
/// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows
|
||||||
|
|
|
@ -22,22 +22,22 @@ pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
|
||||||
///
|
///
|
||||||
/// `LifecyclePolicy::check_for_work` can then be used to drive progress
|
/// `LifecyclePolicy::check_for_work` can then be used to drive progress
|
||||||
/// of the `LifecycleChunk` contained within this `LifecycleDb`
|
/// of the `LifecycleChunk` contained within this `LifecycleDb`
|
||||||
pub struct LifecyclePolicy<'a, M>
|
pub struct LifecyclePolicy<M>
|
||||||
where
|
where
|
||||||
&'a M: LifecycleDb,
|
M: LifecycleDb,
|
||||||
{
|
{
|
||||||
/// The `LifecycleDb` this policy is automating
|
/// The `LifecycleDb` this policy is automating
|
||||||
db: &'a M,
|
db: M,
|
||||||
|
|
||||||
/// Background tasks spawned by this `LifecyclePolicy`
|
/// Background tasks spawned by this `LifecyclePolicy`
|
||||||
trackers: Vec<TaskTracker<ChunkLifecycleAction>>,
|
trackers: Vec<TaskTracker<ChunkLifecycleAction>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, M> LifecyclePolicy<'a, M>
|
impl<M> LifecyclePolicy<M>
|
||||||
where
|
where
|
||||||
&'a M: LifecycleDb,
|
M: LifecycleDb,
|
||||||
{
|
{
|
||||||
pub fn new(db: &'a M) -> Self {
|
pub fn new(db: M) -> Self {
|
||||||
Self {
|
Self {
|
||||||
db,
|
db,
|
||||||
trackers: vec![],
|
trackers: vec![],
|
||||||
|
@ -423,9 +423,9 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, M> Debug for LifecyclePolicy<'a, M>
|
impl<M> Debug for LifecyclePolicy<M>
|
||||||
where
|
where
|
||||||
&'a M: LifecycleDb,
|
M: LifecycleDb + Copy,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
write!(f, "LifecyclePolicy{{..}}")
|
write!(f, "LifecyclePolicy{{..}}")
|
||||||
|
@ -817,7 +817,7 @@ mod tests {
|
||||||
type Chunk = TestLockableChunk<'a>;
|
type Chunk = TestLockableChunk<'a>;
|
||||||
type Partition = TestLockablePartition<'a>;
|
type Partition = TestLockablePartition<'a>;
|
||||||
|
|
||||||
fn buffer_size(self) -> usize {
|
fn buffer_size(&self) -> usize {
|
||||||
// All chunks are 20 bytes
|
// All chunks are 20 bytes
|
||||||
self.partitions
|
self.partitions
|
||||||
.read()
|
.read()
|
||||||
|
@ -826,11 +826,11 @@ mod tests {
|
||||||
.sum()
|
.sum()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rules(self) -> LifecycleRules {
|
fn rules(&self) -> LifecycleRules {
|
||||||
self.rules.clone()
|
self.rules.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn partitions(self) -> Vec<Self::Partition> {
|
fn partitions(&self) -> Vec<Self::Partition> {
|
||||||
self.partitions
|
self.partitions
|
||||||
.read()
|
.read()
|
||||||
.iter()
|
.iter()
|
||||||
|
|
|
@ -32,7 +32,7 @@ macro_rules! run_field_columns_test_case {
|
||||||
let executor = db.executor();
|
let executor = db.executor();
|
||||||
|
|
||||||
let plan = planner
|
let plan = planner
|
||||||
.field_columns(&db, predicate.clone())
|
.field_columns(db.as_ref(), predicate.clone())
|
||||||
.expect("built plan successfully");
|
.expect("built plan successfully");
|
||||||
let fields = executor
|
let fields = executor
|
||||||
.to_field_list(plan)
|
.to_field_list(plan)
|
||||||
|
@ -132,7 +132,7 @@ async fn test_field_name_plan() {
|
||||||
let planner = InfluxRpcPlanner::new();
|
let planner = InfluxRpcPlanner::new();
|
||||||
|
|
||||||
let plan = planner
|
let plan = planner
|
||||||
.field_columns(&db, predicate.clone())
|
.field_columns(db.as_ref(), predicate.clone())
|
||||||
.expect("built plan successfully");
|
.expect("built plan successfully");
|
||||||
|
|
||||||
let mut plans = plan.plans;
|
let mut plans = plan.plans;
|
||||||
|
|
|
@ -50,7 +50,7 @@ macro_rules! run_read_filter_test_case {
|
||||||
let planner = InfluxRpcPlanner::new();
|
let planner = InfluxRpcPlanner::new();
|
||||||
|
|
||||||
let plan = planner
|
let plan = planner
|
||||||
.read_filter(&db, predicate.clone())
|
.read_filter(db.as_ref(), predicate.clone())
|
||||||
.expect("built plan successfully");
|
.expect("built plan successfully");
|
||||||
|
|
||||||
let string_results = run_series_set_plan(db.executor(), plan).await;
|
let string_results = run_series_set_plan(db.executor(), plan).await;
|
||||||
|
|
|
@ -29,7 +29,7 @@ macro_rules! run_read_group_test_case {
|
||||||
let planner = InfluxRpcPlanner::new();
|
let planner = InfluxRpcPlanner::new();
|
||||||
|
|
||||||
let plans = planner
|
let plans = planner
|
||||||
.read_group(&db, predicate.clone(), agg, &group_columns)
|
.read_group(db.as_ref(), predicate.clone(), agg, &group_columns)
|
||||||
.expect("built plan successfully");
|
.expect("built plan successfully");
|
||||||
|
|
||||||
let plans = plans.into_inner();
|
let plans = plans.into_inner();
|
||||||
|
|
|
@ -32,7 +32,13 @@ macro_rules! run_read_window_aggregate_test_case {
|
||||||
let planner = InfluxRpcPlanner::new();
|
let planner = InfluxRpcPlanner::new();
|
||||||
|
|
||||||
let plans = planner
|
let plans = planner
|
||||||
.read_window_aggregate(&db, predicate.clone(), agg, every.clone(), offset.clone())
|
.read_window_aggregate(
|
||||||
|
db.as_ref(),
|
||||||
|
predicate.clone(),
|
||||||
|
agg,
|
||||||
|
every.clone(),
|
||||||
|
offset.clone(),
|
||||||
|
)
|
||||||
.expect("built plan successfully");
|
.expect("built plan successfully");
|
||||||
|
|
||||||
let plans = plans.into_inner();
|
let plans = plans.into_inner();
|
||||||
|
|
|
@ -22,7 +22,7 @@ macro_rules! run_table_names_test_case {
|
||||||
let planner = InfluxRpcPlanner::new();
|
let planner = InfluxRpcPlanner::new();
|
||||||
|
|
||||||
let plan = planner
|
let plan = planner
|
||||||
.table_names(&db, predicate.clone())
|
.table_names(db.as_ref(), predicate.clone())
|
||||||
.expect("built plan successfully");
|
.expect("built plan successfully");
|
||||||
let names = db
|
let names = db
|
||||||
.executor()
|
.executor()
|
||||||
|
|
|
@ -26,7 +26,7 @@ macro_rules! run_tag_keys_test_case {
|
||||||
let planner = InfluxRpcPlanner::new();
|
let planner = InfluxRpcPlanner::new();
|
||||||
|
|
||||||
let plan = planner
|
let plan = planner
|
||||||
.tag_keys(&db, predicate.clone())
|
.tag_keys(db.as_ref(), predicate.clone())
|
||||||
.expect("built plan successfully");
|
.expect("built plan successfully");
|
||||||
let names = db
|
let names = db
|
||||||
.executor()
|
.executor()
|
||||||
|
|
|
@ -24,7 +24,7 @@ macro_rules! run_tag_values_test_case {
|
||||||
let planner = InfluxRpcPlanner::new();
|
let planner = InfluxRpcPlanner::new();
|
||||||
|
|
||||||
let plan = planner
|
let plan = planner
|
||||||
.tag_values(&db, &tag_name, predicate.clone())
|
.tag_values(db.as_ref(), &tag_name, predicate.clone())
|
||||||
.expect("built plan successfully");
|
.expect("built plan successfully");
|
||||||
let names = db
|
let names = db
|
||||||
.executor()
|
.executor()
|
||||||
|
@ -242,7 +242,7 @@ async fn list_tag_values_field_col() {
|
||||||
|
|
||||||
// Test: temp is a field, not a tag
|
// Test: temp is a field, not a tag
|
||||||
let tag_name = "temp";
|
let tag_name = "temp";
|
||||||
let plan_result = planner.tag_values(&db, &tag_name, predicate.clone());
|
let plan_result = planner.tag_values(db.as_ref(), &tag_name, predicate.clone());
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
plan_result.unwrap_err().to_string(),
|
plan_result.unwrap_err().to_string(),
|
||||||
|
|
|
@ -58,7 +58,6 @@ async fn chunk_pruning_sql() {
|
||||||
db,
|
db,
|
||||||
metric_registry,
|
metric_registry,
|
||||||
} = setup().await;
|
} = setup().await;
|
||||||
let db = Arc::new(db);
|
|
||||||
|
|
||||||
let expected = vec![
|
let expected = vec![
|
||||||
"+-----+-------------------------------+",
|
"+-----+-------------------------------+",
|
||||||
|
@ -114,7 +113,6 @@ async fn chunk_pruning_influxrpc() {
|
||||||
db,
|
db,
|
||||||
metric_registry,
|
metric_registry,
|
||||||
} = setup().await;
|
} = setup().await;
|
||||||
let db = Arc::new(db);
|
|
||||||
|
|
||||||
let predicate = PredicateBuilder::new()
|
let predicate = PredicateBuilder::new()
|
||||||
// bar < 3.0
|
// bar < 3.0
|
||||||
|
|
|
@ -10,7 +10,6 @@ use std::{
|
||||||
io::BufWriter,
|
io::BufWriter,
|
||||||
io::Write,
|
io::Write,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use self::{parse::TestQueries, setup::TestSetup};
|
use self::{parse::TestQueries, setup::TestSetup};
|
||||||
|
@ -239,7 +238,6 @@ impl<W: Write> Runner<W> {
|
||||||
let DbScenario {
|
let DbScenario {
|
||||||
scenario_name, db, ..
|
scenario_name, db, ..
|
||||||
} = scenario;
|
} = scenario;
|
||||||
let db = Arc::new(db);
|
|
||||||
|
|
||||||
writeln!(self.log, "Running scenario '{}'", scenario_name)?;
|
writeln!(self.log, "Running scenario '{}'", scenario_name)?;
|
||||||
writeln!(self.log, "SQL: '{:#?}'", sql)?;
|
writeln!(self.log, "SQL: '{:#?}'", sql)?;
|
||||||
|
|
|
@ -19,7 +19,7 @@ use server::{db::test_helpers::write_lp, Db};
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct DbScenario {
|
pub struct DbScenario {
|
||||||
pub scenario_name: String,
|
pub scenario_name: String,
|
||||||
pub db: Db,
|
pub db: Arc<Db>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
@ -853,7 +853,7 @@ pub async fn make_two_chunk_scenarios(
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rollover the mutable buffer and load chunk 0 to the read buffer and object store
|
/// Rollover the mutable buffer and load chunk 0 to the read buffer and object store
|
||||||
pub async fn rollover_and_load(db: &Db, partition_key: &str, table_name: &str) {
|
pub async fn rollover_and_load(db: &Arc<Db>, partition_key: &str, table_name: &str) {
|
||||||
db.rollover_partition(table_name, partition_key)
|
db.rollover_partition(table_name, partition_key)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
@ -7,7 +7,6 @@ use super::scenarios::*;
|
||||||
use arrow::record_batch::RecordBatch;
|
use arrow::record_batch::RecordBatch;
|
||||||
use arrow_util::assert_batches_sorted_eq;
|
use arrow_util::assert_batches_sorted_eq;
|
||||||
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner};
|
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner};
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
/// runs table_names(predicate) and compares it to the expected
|
/// runs table_names(predicate) and compares it to the expected
|
||||||
/// output
|
/// output
|
||||||
|
@ -19,7 +18,6 @@ macro_rules! run_sql_test_case {
|
||||||
let DbScenario {
|
let DbScenario {
|
||||||
scenario_name, db, ..
|
scenario_name, db, ..
|
||||||
} = scenario;
|
} = scenario;
|
||||||
let db = Arc::new(db);
|
|
||||||
|
|
||||||
println!("Running scenario '{}'", scenario_name);
|
println!("Running scenario '{}'", scenario_name);
|
||||||
println!("SQL: '{:#?}'", sql);
|
println!("SQL: '{:#?}'", sql);
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
//! instances of the mutable buffer, read buffer, and object store
|
//! instances of the mutable buffer, read buffer, and object store
|
||||||
|
|
||||||
pub(crate) use crate::db::chunk::DbChunk;
|
pub(crate) use crate::db::chunk::DbChunk;
|
||||||
|
use crate::db::lifecycle::ArcDb;
|
||||||
use crate::{
|
use crate::{
|
||||||
db::{
|
db::{
|
||||||
access::QueryCatalogAccess,
|
access::QueryCatalogAccess,
|
||||||
|
@ -330,13 +331,16 @@ impl Db {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn lockable_chunk(
|
pub fn lockable_chunk(
|
||||||
&self,
|
self: &Arc<Self>,
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
) -> catalog::Result<LockableCatalogChunk<'_>> {
|
) -> catalog::Result<LockableCatalogChunk> {
|
||||||
let chunk = self.chunk(table_name, partition_key, chunk_id)?;
|
let chunk = self.chunk(table_name, partition_key, chunk_id)?;
|
||||||
Ok(LockableCatalogChunk { db: self, chunk })
|
Ok(LockableCatalogChunk {
|
||||||
|
db: Arc::clone(self),
|
||||||
|
chunk,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drops the specified chunk from the catalog and all storage systems
|
/// Drops the specified chunk from the catalog and all storage systems
|
||||||
|
@ -358,7 +362,7 @@ impl Db {
|
||||||
///
|
///
|
||||||
/// Returns a handle to the newly loaded chunk in the read buffer
|
/// Returns a handle to the newly loaded chunk in the read buffer
|
||||||
pub async fn move_chunk_to_read_buffer(
|
pub async fn move_chunk_to_read_buffer(
|
||||||
&self,
|
self: &Arc<Self>,
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
|
@ -379,7 +383,7 @@ impl Db {
|
||||||
///
|
///
|
||||||
/// Returns a handle to the newly created chunk in the read buffer
|
/// Returns a handle to the newly created chunk in the read buffer
|
||||||
pub async fn compact_partition(
|
pub async fn compact_partition(
|
||||||
&self,
|
self: &Arc<Self>,
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
) -> Result<Arc<DbChunk>> {
|
) -> Result<Arc<DbChunk>> {
|
||||||
|
@ -388,7 +392,7 @@ impl Db {
|
||||||
let fut = {
|
let fut = {
|
||||||
let partition = self.partition(table_name, partition_key)?;
|
let partition = self.partition(table_name, partition_key)?;
|
||||||
let partition = LockableCatalogPartition {
|
let partition = LockableCatalogPartition {
|
||||||
db: self,
|
db: Arc::clone(&self),
|
||||||
partition,
|
partition,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -411,7 +415,7 @@ impl Db {
|
||||||
/// Write given table of a given chunk to object store.
|
/// Write given table of a given chunk to object store.
|
||||||
/// The writing only happen if that chunk already in read buffer
|
/// The writing only happen if that chunk already in read buffer
|
||||||
pub async fn write_chunk_to_object_store(
|
pub async fn write_chunk_to_object_store(
|
||||||
&self,
|
self: &Arc<Self>,
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
|
@ -424,7 +428,7 @@ impl Db {
|
||||||
|
|
||||||
/// Unload chunk from read buffer but keep it in object store
|
/// Unload chunk from read buffer but keep it in object store
|
||||||
pub fn unload_read_buffer(
|
pub fn unload_read_buffer(
|
||||||
&self,
|
self: &Arc<Self>,
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
partition_key: &str,
|
partition_key: &str,
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
|
@ -492,7 +496,7 @@ impl Db {
|
||||||
tokio::join!(
|
tokio::join!(
|
||||||
// lifecycle policy loop
|
// lifecycle policy loop
|
||||||
async {
|
async {
|
||||||
let mut policy = ::lifecycle::LifecyclePolicy::new(&self);
|
let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(&self)));
|
||||||
|
|
||||||
while !shutdown.is_cancelled() {
|
while !shutdown.is_cancelled() {
|
||||||
self.worker_iterations_lifecycle
|
self.worker_iterations_lifecycle
|
||||||
|
@ -893,18 +897,17 @@ mod tests {
|
||||||
// Writes should be forwarded to the write buffer *and* the mutable buffer if both are
|
// Writes should be forwarded to the write buffer *and* the mutable buffer if both are
|
||||||
// configured.
|
// configured.
|
||||||
let write_buffer = Arc::new(MockBuffer::default());
|
let write_buffer = Arc::new(MockBuffer::default());
|
||||||
let test_db = TestDb::builder()
|
let db = TestDb::builder()
|
||||||
.write_buffer(Arc::clone(&write_buffer) as _)
|
.write_buffer(Arc::clone(&write_buffer) as _)
|
||||||
.build()
|
.build()
|
||||||
.await
|
.await
|
||||||
.db;
|
.db;
|
||||||
|
|
||||||
let entry = lp_to_entry("cpu bar=1 10");
|
let entry = lp_to_entry("cpu bar=1 10");
|
||||||
test_db.store_entry(entry).await.unwrap();
|
db.store_entry(entry).await.unwrap();
|
||||||
|
|
||||||
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1);
|
assert_eq!(write_buffer.entries.lock().unwrap().len(), 1);
|
||||||
|
|
||||||
let db = Arc::new(test_db);
|
|
||||||
let batches = run_query(db, "select * from cpu").await;
|
let batches = run_query(db, "select * from cpu").await;
|
||||||
|
|
||||||
let expected = vec![
|
let expected = vec![
|
||||||
|
@ -920,7 +923,7 @@ mod tests {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn read_write() {
|
async fn read_write() {
|
||||||
// This test also exercises the path without a write buffer.
|
// This test also exercises the path without a write buffer.
|
||||||
let db = Arc::new(make_db().await.db);
|
let db = make_db().await.db;
|
||||||
write_lp(&db, "cpu bar=1 10").await;
|
write_lp(&db, "cpu bar=1 10").await;
|
||||||
|
|
||||||
let batches = run_query(db, "select * from cpu").await;
|
let batches = run_query(db, "select * from cpu").await;
|
||||||
|
@ -937,7 +940,7 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn try_all_partition_writes_when_some_fail() {
|
async fn try_all_partition_writes_when_some_fail() {
|
||||||
let db = Arc::new(make_db().await.db);
|
let db = make_db().await.db;
|
||||||
|
|
||||||
let nanoseconds_per_hour = 60 * 60 * 1_000_000_000u64;
|
let nanoseconds_per_hour = 60 * 60 * 1_000_000_000u64;
|
||||||
|
|
||||||
|
@ -1014,7 +1017,7 @@ mod tests {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn metrics_during_rollover() {
|
async fn metrics_during_rollover() {
|
||||||
let test_db = make_db().await;
|
let test_db = make_db().await;
|
||||||
let db = Arc::new(test_db.db);
|
let db = test_db.db;
|
||||||
|
|
||||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||||
|
|
||||||
|
@ -1154,7 +1157,7 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn write_with_rollover() {
|
async fn write_with_rollover() {
|
||||||
let db = Arc::new(make_db().await.db);
|
let db = make_db().await.db;
|
||||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||||
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
|
assert_eq!(vec!["1970-01-01T00"], db.partition_keys().unwrap());
|
||||||
|
|
||||||
|
@ -2459,7 +2462,7 @@ mod tests {
|
||||||
// replay)
|
// replay)
|
||||||
let mut chunks = vec![];
|
let mut chunks = vec![];
|
||||||
for _ in 0..2 {
|
for _ in 0..2 {
|
||||||
chunks.push(create_parquet_chunk(db.as_ref()).await);
|
chunks.push(create_parquet_chunk(&db).await);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==================== check: catalog state ====================
|
// ==================== check: catalog state ====================
|
||||||
|
@ -2542,7 +2545,7 @@ mod tests {
|
||||||
// 2: dropped (not in current catalog but parquet file still present for time travel)
|
// 2: dropped (not in current catalog but parquet file still present for time travel)
|
||||||
let mut paths_keep = vec![];
|
let mut paths_keep = vec![];
|
||||||
for i in 0..3i8 {
|
for i in 0..3i8 {
|
||||||
let (table_name, partition_key, chunk_id) = create_parquet_chunk(db.as_ref()).await;
|
let (table_name, partition_key, chunk_id) = create_parquet_chunk(&db).await;
|
||||||
let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap();
|
let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap();
|
||||||
let chunk = chunk.read();
|
let chunk = chunk.read();
|
||||||
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
|
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
|
||||||
|
@ -2634,7 +2637,7 @@ mod tests {
|
||||||
// replay)
|
// replay)
|
||||||
let mut chunks = vec![];
|
let mut chunks = vec![];
|
||||||
for _ in 0..2 {
|
for _ in 0..2 {
|
||||||
chunks.push(create_parquet_chunk(db.as_ref()).await);
|
chunks.push(create_parquet_chunk(&db).await);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==================== do: remove .txn files ====================
|
// ==================== do: remove .txn files ====================
|
||||||
|
@ -2687,7 +2690,7 @@ mod tests {
|
||||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_parquet_chunk(db: &Db) -> (String, String, u32) {
|
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, u32) {
|
||||||
write_lp(db, "cpu bar=1 10").await;
|
write_lp(db, "cpu bar=1 10").await;
|
||||||
let partition_key = "1970-01-01T00";
|
let partition_key = "1970-01-01T00";
|
||||||
let table_name = "cpu";
|
let table_name = "cpu";
|
||||||
|
|
|
@ -34,6 +34,18 @@ mod move_chunk;
|
||||||
mod unload;
|
mod unload;
|
||||||
mod write;
|
mod write;
|
||||||
|
|
||||||
|
/// A newtype wrapper around `Arc<Db>` to workaround trait orphan rules
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ArcDb(pub(super) Arc<Db>);
|
||||||
|
|
||||||
|
impl std::ops::Deref for ArcDb {
|
||||||
|
type Target = Db;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db`
|
/// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db`
|
||||||
///
|
///
|
||||||
|
@ -42,12 +54,12 @@ mod write;
|
||||||
/// without allowing concurrent modification
|
/// without allowing concurrent modification
|
||||||
///
|
///
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct LockableCatalogChunk<'a> {
|
pub struct LockableCatalogChunk {
|
||||||
pub db: &'a Db,
|
pub db: Arc<Db>,
|
||||||
pub chunk: Arc<RwLock<CatalogChunk>>,
|
pub chunk: Arc<RwLock<CatalogChunk>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> LockableChunk for LockableCatalogChunk<'a> {
|
impl LockableChunk for LockableCatalogChunk {
|
||||||
type Chunk = CatalogChunk;
|
type Chunk = CatalogChunk;
|
||||||
|
|
||||||
type Job = Job;
|
type Job = Job;
|
||||||
|
@ -98,15 +110,15 @@ impl<'a> LockableChunk for LockableCatalogChunk<'a> {
|
||||||
/// without allowing concurrent modification
|
/// without allowing concurrent modification
|
||||||
///
|
///
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct LockableCatalogPartition<'a> {
|
pub struct LockableCatalogPartition {
|
||||||
pub db: &'a Db,
|
pub db: Arc<Db>,
|
||||||
pub partition: Arc<RwLock<Partition>>,
|
pub partition: Arc<RwLock<Partition>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> LockablePartition for LockableCatalogPartition<'a> {
|
impl LockablePartition for LockableCatalogPartition {
|
||||||
type Partition = Partition;
|
type Partition = Partition;
|
||||||
|
|
||||||
type Chunk = LockableCatalogChunk<'a>;
|
type Chunk = LockableCatalogChunk;
|
||||||
|
|
||||||
type Error = super::lifecycle::Error;
|
type Error = super::lifecycle::Error;
|
||||||
|
|
||||||
|
@ -123,19 +135,18 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> {
|
||||||
chunk_id: u32,
|
chunk_id: u32,
|
||||||
) -> Option<Self::Chunk> {
|
) -> Option<Self::Chunk> {
|
||||||
s.chunk(chunk_id).map(|chunk| LockableCatalogChunk {
|
s.chunk(chunk_id).map(|chunk| LockableCatalogChunk {
|
||||||
db: s.data().db,
|
db: Arc::clone(&s.data().db),
|
||||||
chunk: Arc::clone(chunk),
|
chunk: Arc::clone(chunk),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn chunks(s: &LifecycleReadGuard<'_, Self::Partition, Self>) -> Vec<(u32, Self::Chunk)> {
|
fn chunks(s: &LifecycleReadGuard<'_, Self::Partition, Self>) -> Vec<(u32, Self::Chunk)> {
|
||||||
let db = s.data().db;
|
|
||||||
s.keyed_chunks()
|
s.keyed_chunks()
|
||||||
.map(|(id, chunk)| {
|
.map(|(id, chunk)| {
|
||||||
(
|
(
|
||||||
id,
|
id,
|
||||||
LockableCatalogChunk {
|
LockableCatalogChunk {
|
||||||
db,
|
db: Arc::clone(&s.data().db),
|
||||||
chunk: Arc::clone(chunk),
|
chunk: Arc::clone(chunk),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
@ -162,24 +173,24 @@ impl<'a> LockablePartition for LockableCatalogPartition<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> LifecycleDb for &'a Db {
|
impl LifecycleDb for ArcDb {
|
||||||
type Chunk = LockableCatalogChunk<'a>;
|
type Chunk = LockableCatalogChunk;
|
||||||
type Partition = LockableCatalogPartition<'a>;
|
type Partition = LockableCatalogPartition;
|
||||||
|
|
||||||
fn buffer_size(self) -> usize {
|
fn buffer_size(&self) -> usize {
|
||||||
self.catalog.metrics().memory().total()
|
self.catalog.metrics().memory().total()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rules(self) -> LifecycleRules {
|
fn rules(&self) -> LifecycleRules {
|
||||||
self.rules.read().lifecycle_rules.clone()
|
self.rules.read().lifecycle_rules.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn partitions(self) -> Vec<Self::Partition> {
|
fn partitions(&self) -> Vec<Self::Partition> {
|
||||||
self.catalog
|
self.catalog
|
||||||
.partitions()
|
.partitions()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|partition| LockableCatalogPartition {
|
.map(|partition| LockableCatalogPartition {
|
||||||
db: self,
|
db: Arc::clone(&self.0),
|
||||||
partition,
|
partition,
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
|
|
|
@ -25,13 +25,13 @@ use super::{error::Result, LockableCatalogChunk, LockableCatalogPartition};
|
||||||
///
|
///
|
||||||
/// TODO: Replace low-level locks with transaction object
|
/// TODO: Replace low-level locks with transaction object
|
||||||
pub(crate) fn compact_chunks(
|
pub(crate) fn compact_chunks(
|
||||||
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition<'_>>,
|
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
|
||||||
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>>,
|
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>>,
|
||||||
) -> Result<(
|
) -> Result<(
|
||||||
TaskTracker<Job>,
|
TaskTracker<Job>,
|
||||||
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
|
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
|
||||||
)> {
|
)> {
|
||||||
let db = partition.data().db;
|
let db = Arc::clone(&partition.data().db);
|
||||||
let table_name = partition.table_name().to_string();
|
let table_name = partition.table_name().to_string();
|
||||||
let chunk_ids: Vec<_> = chunks.iter().map(|x| x.id()).collect();
|
let chunk_ids: Vec<_> = chunks.iter().map(|x| x.id()).collect();
|
||||||
|
|
||||||
|
@ -47,7 +47,7 @@ pub(crate) fn compact_chunks(
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|mut chunk| {
|
.map(|mut chunk| {
|
||||||
// Sanity-check
|
// Sanity-check
|
||||||
assert!(std::ptr::eq(db, chunk.data().db));
|
assert!(Arc::ptr_eq(&db, &chunk.data().db));
|
||||||
assert_eq!(chunk.table_name().as_ref(), table_name.as_str());
|
assert_eq!(chunk.table_name().as_ref(), table_name.as_str());
|
||||||
|
|
||||||
chunk.set_compacting(®istration)?;
|
chunk.set_compacting(®istration)?;
|
||||||
|
|
|
@ -17,12 +17,12 @@ use super::{error::Result, LockableCatalogChunk};
|
||||||
/// Returns a future registered with the tracker registry, and the corresponding tracker
|
/// Returns a future registered with the tracker registry, and the corresponding tracker
|
||||||
/// The caller can either spawn this future to tokio, or block directly on it
|
/// The caller can either spawn this future to tokio, or block directly on it
|
||||||
pub fn move_chunk_to_read_buffer(
|
pub fn move_chunk_to_read_buffer(
|
||||||
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
|
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>,
|
||||||
) -> Result<(
|
) -> Result<(
|
||||||
TaskTracker<Job>,
|
TaskTracker<Job>,
|
||||||
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
|
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
|
||||||
)> {
|
)> {
|
||||||
let db = guard.data().db;
|
let db = Arc::clone(&guard.data().db);
|
||||||
let addr = guard.addr().clone();
|
let addr = guard.addr().clone();
|
||||||
// TODO: Use ChunkAddr within Job
|
// TODO: Use ChunkAddr within Job
|
||||||
let (tracker, registration) = db.jobs.register(Job::CloseChunk {
|
let (tracker, registration) = db.jobs.register(Job::CloseChunk {
|
||||||
|
|
|
@ -12,7 +12,7 @@ use super::LockableCatalogChunk;
|
||||||
use super::error::Result;
|
use super::error::Result;
|
||||||
|
|
||||||
pub fn unload_read_buffer_chunk(
|
pub fn unload_read_buffer_chunk(
|
||||||
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
|
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>,
|
||||||
) -> Result<Arc<DbChunk>> {
|
) -> Result<Arc<DbChunk>> {
|
||||||
debug!(chunk=%chunk.addr(), "unloading chunk from read buffer");
|
debug!(chunk=%chunk.addr(), "unloading chunk from read buffer");
|
||||||
|
|
||||||
|
|
|
@ -33,12 +33,12 @@ use super::error::{
|
||||||
/// Returns a future registered with the tracker registry, and the corresponding tracker
|
/// Returns a future registered with the tracker registry, and the corresponding tracker
|
||||||
/// The caller can either spawn this future to tokio, or block directly on it
|
/// The caller can either spawn this future to tokio, or block directly on it
|
||||||
pub fn write_chunk_to_object_store(
|
pub fn write_chunk_to_object_store(
|
||||||
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk<'_>>,
|
mut guard: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>,
|
||||||
) -> Result<(
|
) -> Result<(
|
||||||
TaskTracker<Job>,
|
TaskTracker<Job>,
|
||||||
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
|
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
|
||||||
)> {
|
)> {
|
||||||
let db = guard.data().db;
|
let db = Arc::clone(&guard.data().db);
|
||||||
let addr = guard.addr().clone();
|
let addr = guard.addr().clone();
|
||||||
|
|
||||||
// TODO: Use ChunkAddr within Job
|
// TODO: Use ChunkAddr within Job
|
||||||
|
@ -64,11 +64,6 @@ pub fn write_chunk_to_object_store(
|
||||||
.catalog_transactions_until_checkpoint
|
.catalog_transactions_until_checkpoint
|
||||||
.get();
|
.get();
|
||||||
|
|
||||||
let preserved_catalog = Arc::clone(&db.preserved_catalog);
|
|
||||||
let catalog = Arc::clone(&db.catalog);
|
|
||||||
let object_store = Arc::clone(&db.store);
|
|
||||||
let cleanup_lock = Arc::clone(&db.cleanup_lock);
|
|
||||||
|
|
||||||
// Drop locks
|
// Drop locks
|
||||||
let chunk = guard.unwrap().chunk;
|
let chunk = guard.unwrap().chunk;
|
||||||
|
|
||||||
|
@ -106,7 +101,7 @@ pub fn write_chunk_to_object_store(
|
||||||
// catalog-level transaction for preservation layer
|
// catalog-level transaction for preservation layer
|
||||||
{
|
{
|
||||||
// fetch shared (= read) guard preventing the cleanup job from deleting our files
|
// fetch shared (= read) guard preventing the cleanup job from deleting our files
|
||||||
let _guard = cleanup_lock.read().await;
|
let _guard = db.cleanup_lock.read().await;
|
||||||
|
|
||||||
// Write this table data into the object store
|
// Write this table data into the object store
|
||||||
//
|
//
|
||||||
|
@ -124,14 +119,16 @@ pub fn write_chunk_to_object_store(
|
||||||
.context(WritingToObjectStore)?;
|
.context(WritingToObjectStore)?;
|
||||||
let parquet_metadata = Arc::new(parquet_metadata);
|
let parquet_metadata = Arc::new(parquet_metadata);
|
||||||
|
|
||||||
let metrics = catalog
|
let metrics = db
|
||||||
|
.catalog
|
||||||
.metrics_registry
|
.metrics_registry
|
||||||
.register_domain_with_labels("parquet", catalog.metric_labels.clone());
|
.register_domain_with_labels("parquet", db.catalog.metric_labels.clone());
|
||||||
let metrics = ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet());
|
let metrics =
|
||||||
|
ParquetChunkMetrics::new(&metrics, db.catalog.metrics().memory().parquet());
|
||||||
let parquet_chunk = Arc::new(
|
let parquet_chunk = Arc::new(
|
||||||
ParquetChunk::new(
|
ParquetChunk::new(
|
||||||
path.clone(),
|
path.clone(),
|
||||||
object_store,
|
Arc::clone(&db.store),
|
||||||
Arc::clone(&parquet_metadata),
|
Arc::clone(&parquet_metadata),
|
||||||
metrics,
|
metrics,
|
||||||
)
|
)
|
||||||
|
@ -144,7 +141,7 @@ pub fn write_chunk_to_object_store(
|
||||||
// transaction lock (that is part of the PreservedCatalog) for too long. By using the
|
// transaction lock (that is part of the PreservedCatalog) for too long. By using the
|
||||||
// cleanup lock (see above) it is ensured that the file that we have written is not deleted
|
// cleanup lock (see above) it is ensured that the file that we have written is not deleted
|
||||||
// in between.
|
// in between.
|
||||||
let mut transaction = preserved_catalog.open_transaction().await;
|
let mut transaction = db.preserved_catalog.open_transaction().await;
|
||||||
transaction
|
transaction
|
||||||
.add_parquet(&path, &parquet_metadata)
|
.add_parquet(&path, &parquet_metadata)
|
||||||
.context(TransactionError)?;
|
.context(TransactionError)?;
|
||||||
|
@ -169,7 +166,7 @@ pub fn write_chunk_to_object_store(
|
||||||
// transaction lock. Therefore we don't need to worry about concurrent modifications of
|
// transaction lock. Therefore we don't need to worry about concurrent modifications of
|
||||||
// preserved chunks.
|
// preserved chunks.
|
||||||
if let Err(e) = ckpt_handle
|
if let Err(e) = ckpt_handle
|
||||||
.create_checkpoint(checkpoint_data_from_catalog(&catalog))
|
.create_checkpoint(checkpoint_data_from_catalog(&db.catalog))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
warn!(%e, "cannot create catalog checkpoint");
|
warn!(%e, "cannot create catalog checkpoint");
|
||||||
|
|
|
@ -19,7 +19,7 @@ use crate::{
|
||||||
// of a Db and its metrics.
|
// of a Db and its metrics.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TestDb {
|
pub struct TestDb {
|
||||||
pub db: Db,
|
pub db: Arc<Db>,
|
||||||
pub metric_registry: metrics::TestMetricRegistry,
|
pub metric_registry: metrics::TestMetricRegistry,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +92,7 @@ impl TestDbBuilder {
|
||||||
|
|
||||||
TestDb {
|
TestDb {
|
||||||
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
|
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
|
||||||
db: Db::new(database_to_commit, Arc::new(JobRegistry::new())),
|
db: Arc::new(Db::new(database_to_commit, Arc::new(JobRegistry::new()))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue