Merge branch 'main' into ntran/no_use_stats

pull/24376/head
kodiakhq[bot] 2021-10-06 14:06:01 +00:00 committed by GitHub
commit 1aee2a49eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 476 additions and 673 deletions

1
Cargo.lock generated
View File

@ -3174,6 +3174,7 @@ dependencies = [
"arrow",
"arrow_util",
"async-trait",
"chrono",
"data_types",
"datafusion 0.1.0",
"internal_types",

View File

@ -14,6 +14,6 @@ internal_types = { path = "../internal_types" }
observability_deps = { path = "../observability_deps" }
tokio = { version = "1.11", features = ["macros", "time"] }
tracker = { path = "../tracker" }
[dev-dependencies]
[dev-dependencies]
tokio = { version = "1.11", features = ["macros", "time", "rt"] }

View File

@ -15,7 +15,6 @@ use data_types::{
DatabaseName,
};
use internal_types::access::AccessMetrics;
use std::time::Instant;
use tracker::TaskTracker;
mod guard;
@ -86,7 +85,7 @@ pub trait LockablePartition: Sized + std::fmt::Display {
/// write has been present in memory
fn prepare_persist(
partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>,
now: Instant,
now: DateTime<Utc>,
) -> Option<Self::PersistHandle>;
/// Split and persist chunks.
@ -158,10 +157,10 @@ pub trait LifecyclePartition {
///
/// `now` is the wall clock time that should be used to compute how long a given
/// write has been present in memory
fn persistable_row_count(&self, now: Instant) -> usize;
fn persistable_row_count(&self, now: DateTime<Utc>) -> usize;
/// Returns the age of the oldest unpersisted write
fn minimum_unpersisted_age(&self) -> Option<Instant>;
fn minimum_unpersisted_age(&self) -> Option<DateTime<Utc>>;
}
/// The lifecycle operates on chunks implementing this trait

View File

@ -82,8 +82,6 @@ where
/// - If persist is `true` it will only unload persisted chunks in order of creation time, starting with the oldest.
/// - If persist is `false` it will consider all chunks, also in order of creation time, starting with the oldest.
///
/// TODO: use LRU instead of creation time
///
fn maybe_free_memory<P: LockablePartition>(
&mut self,
db_name: &DatabaseName<'static>,
@ -343,7 +341,7 @@ where
db_name: &DatabaseName<'static>,
partition: &P,
rules: &LifecycleRules,
now: Instant,
now: DateTime<Utc>,
) -> bool {
// TODO: Encapsulate locking into a CatalogTransaction type
let partition = partition.read();
@ -353,19 +351,15 @@ where
return false;
}
let persistable_age_seconds = partition
let persistable_age_seconds: u32 = partition
.minimum_unpersisted_age()
.and_then(|minimum_unpersisted_age| {
// If writes happened between when the policy loop
// started and this check is done, the duration may be
// negative. Skip persistence in this case to avoid
// panic in `duration_since`
Some(
now.checked_duration_since(minimum_unpersisted_age)?
.as_secs(),
)
(now - minimum_unpersisted_age)
.num_seconds()
.try_into()
.ok()
})
.unwrap_or_default() as u32;
.unwrap_or_default();
let persistable_row_count = partition.persistable_row_count(now);
debug!(%db_name, %partition,
@ -512,8 +506,7 @@ where
// but persistence cannot proceed because of in-progress
// compactions
let stall_compaction_persisting = if rules.persist && !self.suppress_persistence {
let persisting =
self.maybe_persist_chunks(&db_name, partition, &rules, now_instant);
let persisting = self.maybe_persist_chunks(&db_name, partition, &rules, now);
if persisting {
debug!(%db_name, %partition, reason="persisting", "stalling compaction");
}
@ -672,6 +665,7 @@ mod tests {
ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk,
LockablePartition, PersistHandle,
};
use chrono::TimeZone;
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder, ChunkStorage};
use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions;
use std::{
@ -695,7 +689,7 @@ mod tests {
struct TestPartition {
chunks: BTreeMap<ChunkId, (ChunkOrder, Arc<RwLock<TestChunk>>)>,
persistable_row_count: usize,
minimum_unpersisted_age: Option<Instant>,
minimum_unpersisted_age: Option<DateTime<Utc>>,
max_persistable_timestamp: Option<DateTime<Utc>>,
next_id: ChunkId,
}
@ -704,7 +698,7 @@ mod tests {
fn with_persistence(
self,
persistable_row_count: usize,
minimum_unpersisted_age: Instant,
minimum_unpersisted_age: DateTime<Utc>,
max_persistable_timestamp: DateTime<Utc>,
) -> Self {
Self {
@ -890,7 +884,7 @@ mod tests {
fn prepare_persist(
partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>,
_now: Instant,
_now: DateTime<Utc>,
) -> Option<Self::PersistHandle> {
Some(TestPersistHandle {
timestamp: partition.max_persistable_timestamp.unwrap(),
@ -983,11 +977,11 @@ mod tests {
false
}
fn persistable_row_count(&self, _now: Instant) -> usize {
fn persistable_row_count(&self, _now: DateTime<Utc>) -> usize {
self.persistable_row_count
}
fn minimum_unpersisted_age(&self) -> Option<Instant> {
fn minimum_unpersisted_age(&self) -> Option<DateTime<Utc>> {
self.minimum_unpersisted_age
}
}
@ -1112,7 +1106,7 @@ mod tests {
}
fn from_secs(secs: i64) -> DateTime<Utc> {
DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(secs, 0), Utc)
Utc.timestamp(secs, 0)
}
#[test]
@ -1619,7 +1613,8 @@ mod tests {
max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()),
..Default::default()
};
let now = Instant::now();
let now = from_secs(0);
let now_instant = Instant::now();
let partitions = vec![
// Insufficient rows and not old enough => don't persist but can compact
@ -1648,7 +1643,7 @@ mod tests {
TestChunk::new(ChunkId::new(6), 0, ChunkStorage::ObjectStoreOnly)
.with_min_timestamp(from_secs(5)),
])
.with_persistence(10, now - Duration::from_secs(10), from_secs(20)),
.with_persistence(10, now - chrono::Duration::seconds(10), from_secs(20)),
// Sufficient rows but conflicting compaction => prevent compaction
TestPartition::new(vec![
TestChunk::new(ChunkId::new(7), 0, ChunkStorage::ClosedMutableBuffer)
@ -1690,7 +1685,7 @@ mod tests {
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(0), now);
lifecycle.check_for_work(from_secs(0), now_instant);
assert_eq!(
*db.events.read(),
vec![
@ -1714,21 +1709,22 @@ mod tests {
persist_age_threshold_seconds: NonZeroU32::new(20).unwrap(),
..Default::default()
};
let now = Instant::now();
let now = Utc::now();
let now_instant = Instant::now();
// This could occur if the in-memory contents of a partition are deleted, and
// compaction causes the chunks to be removed. In such a scenario the persistence
// windows will still think there are rows to be persisted
let partitions = vec![TestPartition::new(vec![]).with_persistence(
10,
now - Duration::from_secs(20),
now - chrono::Duration::seconds(20),
from_secs(20),
)];
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(from_secs(0), now);
lifecycle.check_for_work(now, now_instant);
assert_eq!(*db.events.read(), vec![MoverEvents::Persist(vec![]),]);
}
@ -1742,7 +1738,8 @@ mod tests {
max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()),
..Default::default()
};
let now = Instant::now();
let now = Utc::now();
let now_instant = Instant::now();
let partitions = vec![
// Sufficient rows => could persist but should be suppressed
@ -1758,13 +1755,13 @@ mod tests {
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new_suppress_persistence(&db);
lifecycle.check_for_work(from_secs(0), now);
lifecycle.check_for_work(now, now_instant);
assert_eq!(
*db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new(2), ChunkId::new(3)]),]
);
lifecycle.check_for_work(from_secs(0), now);
lifecycle.check_for_work(now, now_instant);
assert_eq!(
*db.events.read(),
vec![MoverEvents::Compact(vec![ChunkId::new(2), ChunkId::new(3)]),]
@ -1772,7 +1769,7 @@ mod tests {
lifecycle.unsuppress_persistence();
lifecycle.check_for_work(from_secs(0), now);
lifecycle.check_for_work(now, now_instant);
assert_eq!(
*db.events.read(),
vec![

File diff suppressed because it is too large Load Diff

View File

@ -9,6 +9,7 @@ description = "Tests of the query engine against different database configuratio
[dependencies]
async-trait = "0.1"
chrono = "0.4"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
once_cell = { version = "1.4.0", features = ["parking_lot"] }

View File

@ -4,7 +4,6 @@ pub mod delete;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use once_cell::sync::OnceCell;
@ -162,11 +161,7 @@ impl DbSetup for NoData {
// Now write the data in RB to object store but keep it in RB
let chunk_id = db
.persist_partition(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition("cpu", partition_key, true)
.await
.unwrap()
.unwrap()
@ -610,13 +605,9 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
db.compact_open_chunk("h2o", partition_key).await.unwrap();
db.persist_partition(
"h2o",
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition("h2o", partition_key, true)
.await
.unwrap();
write_lp(
&db,
@ -801,13 +792,9 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
.await
.unwrap();
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
}
let scenario4 = DbScenario {
scenario_name: "Data in both read buffer and object store".into(),
@ -823,11 +810,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) ->
.unwrap();
let id = db
.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap()
@ -914,23 +897,15 @@ pub async fn make_two_chunk_scenarios(
let db = make_db().await.db;
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
}
let table_names = write_lp(&db, data2).await;
for table_name in &table_names {
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
}
let scenario5 = DbScenario {
scenario_name: "Data in two read buffer chunks and two parquet file chunks".into(),
@ -942,11 +917,7 @@ pub async fn make_two_chunk_scenarios(
let table_names = write_lp(&db, data1).await;
for table_name in &table_names {
let id = db
.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap()
@ -957,11 +928,7 @@ pub async fn make_two_chunk_scenarios(
let table_names = write_lp(&db, data2).await;
for table_name in &table_names {
let id = db
.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap()
@ -1003,13 +970,9 @@ pub async fn make_two_chunk_scenarios(
/// Rollover the mutable buffer and load chunk 0 to the read buffer and object store
pub async fn rollover_and_load(db: &Arc<Db>, partition_key: &str, table_name: &str) {
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
}
// This function loads one chunk of lp data into RUB for testing predicate pushdown
@ -1035,11 +998,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
let table_names = write_lp(&db, data).await;
for table_name in &table_names {
let id = db
.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap()
@ -1126,11 +1085,8 @@ impl DbSetup for ChunkOrder {
let partition = partition.read();
let chunks = LockablePartition::chunks(&partition);
let mut partition = partition.upgrade();
let flush_handle = LockablePartition::prepare_persist(
&mut partition,
Instant::now() + Duration::from_secs(1),
)
.unwrap();
let flush_handle =
LockablePartition::prepare_persist(&mut partition, chrono::MAX_DATETIME).unwrap();
(chunks, flush_handle)
};

View File

@ -9,7 +9,6 @@ use async_trait::async_trait;
use query::QueryChunk;
use std::fmt::Display;
use std::sync::Arc;
use std::time::{Duration, Instant};
use server::db::test_helpers::write_lp;
use server::utils::make_db;
@ -652,11 +651,7 @@ async fn make_chunk_with_deletes_at_different_stages(
match chunk_stage {
ChunkStage::RubOs | ChunkStage::Os => {
let chunk_result = db
.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition(table_name, partition_key, true)
.await
.unwrap();
@ -778,11 +773,7 @@ async fn make_different_stage_chunks_with_deletes_scenario(
match chunk_data.chunk_stage {
ChunkStage::RubOs | ChunkStage::Os => {
let chunk = db
.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap();

View File

@ -1433,13 +1433,9 @@ mod tests {
assert!(db.partition_summary("table_1", "partition_by_a").is_some());
// persist one partition
db.persist_partition(
"table_1",
"partition_by_b",
Instant::now() + Duration::from_secs(2),
)
.await
.unwrap();
db.persist_partition("table_1", "partition_by_b", true)
.await
.unwrap();
// shutdown first database
database.shutdown();

View File

@ -300,8 +300,10 @@ pub struct Db {
/// - to keep the lifecycle state (e.g. the number of running compactions) around
lifecycle_policy: tokio::sync::Mutex<Option<::lifecycle::LifecyclePolicy<WeakDb>>>,
/// TESTING ONLY: Mocked `Instant::now()` for the background worker
background_worker_now_override: Mutex<Option<Instant>>,
/// TESTING ONLY: Mocked `Utc::now()` for the background worker
///
/// TODO: Replace with TimeProvider (#2722)
now_override: Mutex<Option<DateTime<Utc>>>,
/// To-be-written delete predicates.
delete_predicates_mailbox: Mutex<Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)>>,
@ -357,7 +359,7 @@ impl Db {
write_buffer_producer: database_to_commit.write_buffer_producer,
cleanup_lock: Default::default(),
lifecycle_policy: tokio::sync::Mutex::new(None),
background_worker_now_override: Default::default(),
now_override: Default::default(),
delete_predicates_mailbox: Default::default(),
};
let this = Arc::new(this);
@ -665,31 +667,17 @@ impl Db {
/// Persist given partition.
///
/// If `force` is `true` will persist all unpersisted data regardless of arrival time
///
/// Errors if there is nothing to persist at the moment as per the lifecycle rules. If successful it returns the
/// chunk that contains the persisted data.
///
/// The `now` timestamp should normally be `Instant::now()` but can be altered for testing.
pub async fn persist_partition(
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
now: Instant,
force: bool,
) -> Result<Option<Arc<DbChunk>>> {
self.persist_partition_with_timestamp(table_name, partition_key, now, Utc::now)
.await
}
/// Internal use only for testing.
async fn persist_partition_with_timestamp<F>(
self: &Arc<Self>,
table_name: &str,
partition_key: &str,
now: Instant,
f_parquet_creation_timestamp: F,
) -> Result<Option<Arc<DbChunk>>>
where
F: Fn() -> DateTime<Utc> + Send,
{
// Use explicit scope to ensure the async generator doesn't
// assume the locks have to possibly live across the `await`
let fut = {
@ -702,7 +690,10 @@ impl Db {
// get flush handle
let flush_handle = partition
.persistence_windows_mut()
.map(|window| window.flush_handle(now))
.map(|window| match force {
true => window.flush_all_handle(),
false => window.flush_handle(self.utc_now()),
})
.flatten()
.context(CannotFlushPartition {
table_name,
@ -732,13 +723,8 @@ impl Db {
})
.collect::<Result<Vec<_>, _>>()?;
let (_, fut) = lifecycle::persist_chunks(
partition,
chunks,
flush_handle,
f_parquet_creation_timestamp,
)
.context(LifecycleError)?;
let (_, fut) = lifecycle::persist_chunks(partition, chunks, flush_handle)
.context(LifecycleError)?;
fut
};
@ -845,9 +831,7 @@ impl Db {
.as_mut()
.expect("lifecycle policy should be initialized");
policy
.check_for_work(Utc::now(), self.background_worker_now())
.await
policy.check_for_work(self.utc_now(), Instant::now()).await
}
};
@ -937,11 +921,11 @@ impl Db {
info!("finished db background worker");
}
/// `Instant::now()` that is used by the background worker. Can be mocked for testing.
fn background_worker_now(&self) -> Instant {
self.background_worker_now_override
.lock()
.unwrap_or_else(Instant::now)
/// `Utc::now()` that is used by `Db`. Can be mocked for testing.
///
/// TODO: Remove (#2722)
fn utc_now(&self) -> DateTime<Utc> {
self.now_override.lock().unwrap_or_else(Utc::now)
}
async fn cleanup_unreferenced_parquet_files(
@ -1233,21 +1217,21 @@ impl Db {
row_count,
min_time,
max_time,
self.background_worker_now(),
self.utc_now(),
);
}
None => {
let mut windows = PersistenceWindows::new(
partition.addr().clone(),
late_arrival_window,
self.background_worker_now(),
self.utc_now(),
);
windows.add_range(
sequence,
row_count,
min_time,
max_time,
self.background_worker_now(),
self.utc_now(),
);
partition.set_persistence_windows(windows);
}
@ -1465,6 +1449,7 @@ mod tests {
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use chrono::{DateTime, TimeZone};
use futures::{stream, StreamExt, TryStreamExt};
use predicate::delete_expr::DeleteExpr;
use tokio_util::sync::CancellationToken;
@ -1808,13 +1793,9 @@ mod tests {
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size);
let t6_write = t5_write + chrono::Duration::seconds(1);
*db.now_override.lock() = Some(t6_write);
let chunk_id = db
.persist_partition_with_timestamp(
"cpu",
"1970-01-01T00",
Instant::now() + Duration::from_secs(1),
|| t6_write,
)
.persist_partition("cpu", "1970-01-01T00", true)
.await
.unwrap()
.unwrap()
@ -2244,13 +2225,9 @@ mod tests {
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
let t3_persist = t2_write + chrono::Duration::seconds(1);
*db.now_override.lock() = Some(t3_persist);
let pq_chunk = db
.persist_partition_with_timestamp(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
|| t3_persist,
)
.persist_partition("cpu", partition_key, true)
.await
.unwrap()
.unwrap();
@ -2346,13 +2323,9 @@ mod tests {
.unwrap();
// Write the RB chunk to Object Store but keep it in RB
let t3_persist = t2_write + chrono::Duration::seconds(1);
*db.now_override.lock() = Some(t3_persist);
let pq_chunk = db
.persist_partition_with_timestamp(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
|| t3_persist,
)
.persist_partition("cpu", partition_key, true)
.await
.unwrap()
.unwrap();
@ -2800,14 +2773,11 @@ mod tests {
// Persist rb to parquet os
let t4_persist = t3_write + chrono::Duration::seconds(1);
db.persist_partition_with_timestamp(
"cpu",
"1970-01-01T00",
Instant::now() + Duration::from_secs(1),
|| t4_persist,
)
.await
.unwrap();
*db.now_override.lock() = Some(t4_persist);
db.persist_partition("cpu", "1970-01-01T00", true)
.await
.unwrap()
.unwrap();
// Check first/last write times on the chunks at this point
let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return");
@ -2950,13 +2920,9 @@ mod tests {
db.compact_partition("cpu", "1970-01-01T00").await.unwrap();
// write the read buffer chunk to object store
db.persist_partition(
"cpu",
"1970-01-01T00",
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition("cpu", "1970-01-01T00", true)
.await
.unwrap();
// write into a separate partition
write_lp(&db, "cpu bar=1 400000000000000").await;
@ -3136,13 +3102,9 @@ mod tests {
assert_ne!(mb_chunk.id(), rb_chunk.id());
// RB => OS
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
// we should have chunks in both the read buffer only
assert!(mutable_chunk_ids(&db, partition_key).is_empty());
@ -3694,13 +3656,9 @@ mod tests {
let partition_key = "part_a";
write_lp(&db, "cpu,part=a row=10,selector=0i 10").await;
write_lp(&db, "cpu,part=a row=11,selector=1i 11").await;
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
// 2: RUB
let partition_key = "part_b";
@ -3719,12 +3677,9 @@ mod tests {
let partition_key = "part_d";
write_lp(&db, "cpu,part=d row=40,selector=0i 40").await;
write_lp(&db, "cpu,part=d row=41,selector=1i 41").await;
let chunk_id = db
.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap()
@ -3749,13 +3704,9 @@ mod tests {
// ==================== do: preserve another partition ====================
let partition_key = "part_b";
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
// ==================== do: use background worker for a short while ====================
let iters_start = db.worker_iterations_delete_predicate_preservation();
@ -4041,13 +3992,9 @@ mod tests {
));
// once persisted drop should work
db.persist_partition(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition("cpu", partition_key, true)
.await
.unwrap();
db.drop_partition("cpu", partition_key).await.unwrap();
// no chunks left
@ -4071,13 +4018,9 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=2 20").await;
let partition_key = "1970-01-01T00";
db.persist_partition(
"cpu",
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
db.persist_partition("cpu", partition_key, true)
.await
.unwrap();
// query data before drop
let expected = vec![
@ -4117,11 +4060,7 @@ mod tests {
// Write the RB chunk to Object Store but keep it in RB
let chunk = db
.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap();

View File

@ -568,7 +568,6 @@ mod tests {
utils::make_db,
};
use data_types::chunk_metadata::ChunkStorage;
use std::time::{Duration, Instant};
async fn test_chunk_access(chunk: &CatalogChunk) {
let t1 = chunk.access_recorder().get_metrics();
@ -661,11 +660,7 @@ mod tests {
write_lp_with_time(&db, "cpu,tag=1 bar=1 1", creation_time).await;
let id = db
.persist_partition(
"cpu",
"1970-01-01T00",
Instant::now() + Duration::from_secs(10000),
)
.persist_partition("cpu", "1970-01-01T00", true)
.await
.unwrap()
.unwrap()
@ -690,19 +685,14 @@ mod tests {
async fn parquet_snapshot() {
let db = make_db().await.db;
let before_creation = Utc::now();
write_lp(&db, "cpu,tag=1 bar=1 1").await;
let after_creation = Utc::now();
write_lp(&db, "cpu,tag=2 bar=2 2").await;
let after_write = Utc::now();
let w0 = Utc::now();
write_lp_with_time(&db, "cpu,tag=1 bar=1 1", w0).await;
let w1 = w0 + chrono::Duration::seconds(4);
write_lp_with_time(&db, "cpu,tag=2 bar=2 2", w1).await;
db.persist_partition(
"cpu",
"1970-01-01T00",
Instant::now() + Duration::from_secs(10000),
)
.await
.unwrap();
db.persist_partition("cpu", "1970-01-01T00", true)
.await
.unwrap();
let chunks = db.catalog.chunks();
assert_eq!(chunks.len(), 1);
@ -713,9 +703,7 @@ mod tests {
let first_write = snapshot.time_of_first_write();
let last_write = snapshot.time_of_last_write();
assert!(before_creation < first_write);
assert!(first_write < after_creation);
assert!(first_write < last_write);
assert!(last_write < after_write);
assert_eq!(w0, first_write);
assert_eq!(w1, last_write);
}
}

View File

@ -29,7 +29,6 @@ use std::{
convert::TryInto,
fmt::Display,
sync::{Arc, Weak},
time::Instant,
};
use tracker::{RwLock, TaskTracker};
@ -203,7 +202,7 @@ impl LockablePartition for LockableCatalogPartition {
fn prepare_persist(
partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>,
now: Instant,
now: DateTime<Utc>,
) -> Option<Self::PersistHandle> {
let window = partition.persistence_windows_mut().unwrap();
let handle = window.flush_handle(now);
@ -217,7 +216,7 @@ impl LockablePartition for LockableCatalogPartition {
handle: Self::PersistHandle,
) -> Result<TaskTracker<Job>, Self::Error> {
info!(table=%partition.table_name(), partition=%partition.partition_key(), "persisting chunks");
let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0, Utc::now)?;
let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0)?;
let _ = tokio::spawn(async move { fut.await.log_if_error("persisting chunks") });
Ok(tracker)
}
@ -288,13 +287,13 @@ impl LifecyclePartition for Partition {
.unwrap_or(true)
}
fn persistable_row_count(&self, now: Instant) -> usize {
fn persistable_row_count(&self, now: DateTime<Utc>) -> usize {
self.persistence_windows()
.map(|w| w.persistable_row_count(now))
.unwrap_or(0)
}
fn minimum_unpersisted_age(&self) -> Option<Instant> {
fn minimum_unpersisted_age(&self) -> Option<DateTime<Utc>> {
self.persistence_windows()
.and_then(|w| w.minimum_unpersisted_age())
}

View File

@ -18,19 +18,18 @@ use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
/// Split and then persist the provided chunks
///
/// `flush_handle` describes both what to persist and also acts as a transaction
/// on the persistence windows
///
/// TODO: Replace low-level locks with transaction object
pub fn persist_chunks<F>(
pub fn persist_chunks(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>>,
flush_handle: FlushHandle,
f_parquet_creation_timestamp: F,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Option<Arc<DbChunk>>>> + Send>,
)>
where
F: Fn() -> DateTime<Utc> + Send,
{
)> {
let now = std::time::Instant::now(); // time persist duration.
let db = Arc::clone(&partition.data().db);
let addr = partition.addr().clone();
@ -188,13 +187,7 @@ where
};
let to_persist = to_persist.write();
write_chunk_to_object_store(
partition_write,
to_persist,
flush_handle,
f_parquet_creation_timestamp,
)?
.1
write_chunk_to_object_store(partition_write, to_persist, flush_handle)?.1
};
// Wait for write operation to complete
@ -227,7 +220,7 @@ mod tests {
use query::QueryDatabase;
use std::{
num::{NonZeroU32, NonZeroU64},
time::{Duration, Instant},
time::Duration,
};
async fn test_db() -> Arc<Db> {
@ -266,14 +259,14 @@ mod tests {
let mut partition = partition.upgrade();
let handle = LockablePartition::prepare_persist(&mut partition, Instant::now())
let handle = LockablePartition::prepare_persist(&mut partition, Utc::now())
.unwrap()
.0;
assert_eq!(handle.timestamp(), Utc.timestamp_nanos(10));
let chunks: Vec<_> = chunks.map(|x| x.upgrade()).collect();
persist_chunks(partition, chunks, handle, Utc::now)
persist_chunks(partition, chunks, handle)
.unwrap()
.1
.await
@ -292,16 +285,16 @@ mod tests {
async fn test_persist_delete() {
let db = test_db().await;
let late_arrival = Duration::from_secs(1);
let late_arrival = chrono::Duration::seconds(1);
let t0 = Instant::now();
let t0 = Utc::now();
let t1 = t0 + late_arrival * 10;
let t2 = t1 + late_arrival * 10;
*db.background_worker_now_override.lock() = Some(t0);
*db.now_override.lock() = Some(t0);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
*db.background_worker_now_override.lock() = Some(t1);
*db.now_override.lock() = Some(t1);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=3 23").await;
let partition_keys = db.partition_keys().unwrap();
@ -318,8 +311,9 @@ mod tests {
db.delete("cpu", predicate).await.unwrap();
// Try to persist first write but it has been deleted
*db.now_override.lock() = Some(t0 + late_arrival);
let maybe_chunk = db
.persist_partition("cpu", partition_key.as_str(), t0 + late_arrival)
.persist_partition("cpu", partition_key.as_str(), false)
.await
.unwrap();
@ -341,13 +335,13 @@ mod tests {
);
// Add a second set of writes one of which overlaps the above chunk
*db.background_worker_now_override.lock() = Some(t2);
*db.now_override.lock() = Some(t2);
write_lp(db.as_ref(), "cpu,tag1=foo bar=2 23").await;
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=2 26").await;
// Persist second write but not third
let maybe_chunk = db
.persist_partition("cpu", partition_key.as_str(), t2)
.persist_partition("cpu", partition_key.as_str(), false)
.await
.unwrap();
assert!(maybe_chunk.is_some());
@ -384,8 +378,9 @@ mod tests {
db.delete("cpu", predicate).await.unwrap();
// Try to persist third set of writes
*db.now_override.lock() = Some(t2 + late_arrival);
let maybe_chunk = db
.persist_partition("cpu", partition_key.as_str(), t2 + late_arrival)
.persist_partition("cpu", partition_key.as_str(), false)
.await
.unwrap();
@ -404,10 +399,10 @@ mod tests {
async fn persist_compacted_deletes() {
let db = test_db().await;
let late_arrival = Duration::from_secs(1);
let t0 = Instant::now();
let late_arrival = chrono::Duration::seconds(1);
let t0 = Utc::now();
*db.background_worker_now_override.lock() = Some(t0);
*db.now_override.lock() = Some(t0);
write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await;
let partition_keys = db.partition_keys().unwrap();
@ -439,8 +434,9 @@ mod tests {
// Persistence windows unaware rows have been deleted
assert!(!partition.read().persistence_windows().unwrap().is_empty());
*db.now_override.lock() = Some(t0 + late_arrival);
let maybe_chunk = db
.persist_partition("cpu", partition_key.as_str(), t0 + late_arrival * 2)
.persist_partition("cpu", partition_key.as_str(), false)
.await
.unwrap();

View File

@ -12,7 +12,6 @@ use crate::db::{
use ::lifecycle::LifecycleWriteGuard;
use chrono::{DateTime, Utc};
use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job};
use internal_types::selection::Selection;
use observability_deps::tracing::{debug, warn};
@ -39,20 +38,20 @@ use super::{
/// The implementation for writing a chunk to the object store
///
/// `flush_handle` describes both what to persist and also acts as a transaction
/// on the persistence windows
///
/// Returns a future registered with the tracker registry, and the corresponding tracker
///
/// The caller can either spawn this future to tokio, or block directly on it
pub(super) fn write_chunk_to_object_store<F>(
pub(super) fn write_chunk_to_object_store(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>,
flush_handle: FlushHandle,
f_parquet_creation_timestamp: F,
) -> Result<(
TaskTracker<Job>,
TrackedFuture<impl Future<Output = Result<Arc<DbChunk>>> + Send>,
)>
where
F: Fn() -> DateTime<Utc> + Send,
{
)> {
let db = Arc::clone(&chunk.data().db);
let addr = chunk.addr().clone();
let table_name = Arc::clone(&addr.table_name);
@ -125,7 +124,7 @@ where
// IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted
// between creation and the transaction commit.
let metadata = IoxMetadata {
creation_timestamp: f_parquet_creation_timestamp(),
creation_timestamp: db.utc_now(),
table_name: Arc::clone(&table_name),
partition_key: Arc::clone(&partition_key),
chunk_id: addr.chunk_id,

View File

@ -130,7 +130,7 @@ pub async fn seek_to_end(db: &Db, write_buffer: &mut dyn WriteBufferReading) ->
let mut windows = PersistenceWindows::new(
partition.addr().clone(),
late_arrival_window,
db.background_worker_now(),
db.utc_now(),
);
windows.mark_seen_and_persisted(&dummy_checkpoint);
partition.set_persistence_windows(windows);
@ -236,7 +236,7 @@ pub async fn perform_replay(
|sequence, partition_key, table_batch| {
filter_entry(sequence, partition_key, table_batch, replay_plan)
},
Utc::now(),
db.utc_now(),
) {
Ok(_) => {
break;
@ -289,7 +289,7 @@ pub async fn perform_replay(
let mut windows = PersistenceWindows::new(
partition.addr().clone(),
late_arrival_window,
db.background_worker_now(),
db.utc_now(),
);
windows.mark_seen_and_persisted(partition_checkpoint);
partition.set_persistence_windows(windows);
@ -418,7 +418,7 @@ mod tests {
};
use arrow_util::assert_batches_eq;
use chrono::Utc;
use chrono::{DateTime, Utc};
use data_types::{
database_rules::{PartitionTemplate, Partitioner, TemplatePart},
server_id::ServerId,
@ -576,7 +576,7 @@ mod tests {
db_name,
partition_template.clone(),
self.catalog_transactions_until_checkpoint,
Instant::now(),
Utc::now(),
)
.await;
@ -610,7 +610,7 @@ mod tests {
join_handle.await.unwrap();
// remember time
let now = test_db.db.background_worker_now_override.lock().unwrap();
let now = test_db.db.now_override.lock().unwrap();
// drop old DB
drop(test_db);
@ -656,14 +656,7 @@ mod tests {
for (table_name, partition_key) in partitions {
println!("Persist {}:{}", table_name, partition_key);
loop {
match db
.persist_partition(
table_name,
partition_key,
db.background_worker_now(),
)
.await
{
match db.persist_partition(table_name, partition_key, false).await {
Ok(_) => break,
Err(crate::db::Error::CannotFlushPartition { .. }) => {
// cannot persist right now because of some lifecycle action, so wait a bit
@ -700,8 +693,8 @@ mod tests {
}
}
Step::MakeWritesPersistable => {
let mut guard = test_db.db.background_worker_now_override.lock();
*guard = Some(guard.unwrap() + Duration::from_secs(60));
let mut guard = test_db.db.now_override.lock();
*guard = Some(guard.unwrap() + chrono::Duration::seconds(60));
}
Step::Assert(checks) => {
Self::eval_checks(&checks, true, &test_db).await;
@ -768,7 +761,7 @@ mod tests {
db_name: &'static str,
partition_template: PartitionTemplate,
catalog_transactions_until_checkpoint: NonZeroU64,
now: Instant,
now: DateTime<Utc>,
) -> (TestDb, CancellationToken, JoinHandle<()>) {
let test_db = TestDb::builder()
.object_store(object_store)
@ -786,7 +779,7 @@ mod tests {
.await;
// Mock time
*test_db.db.background_worker_now_override.lock() = Some(now);
*test_db.db.now_override.lock() = Some(now);
// start background worker
let shutdown: CancellationToken = Default::default();

View File

@ -7,7 +7,7 @@ use std::{
convert::TryFrom,
num::{NonZeroU32, NonZeroU64},
sync::Arc,
time::{Duration, Instant},
time::Duration,
};
use tokio::{
runtime::{Handle, Runtime},
@ -88,11 +88,7 @@ async fn setup(object_store: Arc<ObjectStore>, done: &Mutex<bool>) {
.unwrap();
let chunk = db
.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap();

View File

@ -1,7 +1,6 @@
use std::convert::{TryFrom, TryInto};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Instant;
use data_types::chunk_metadata::ChunkId;
use data_types::{server_id::ServerId, DatabaseName};
@ -573,7 +572,7 @@ where
.db(&db_name)
.map_err(default_server_error_handler)?;
db.persist_partition(&table_name, &partition_key, Instant::now())
db.persist_partition(&table_name, &partition_key, false)
.await
.map_err(default_db_error_handler)?;