Merge branch 'main' into debugkafka
commit
76d9b8f7cc
|
@ -71,6 +71,7 @@ pub(crate) fn compact_chunks(
|
|||
let ctx = db.exec.new_context(ExecutorType::Reorg);
|
||||
|
||||
let fut = async move {
|
||||
let fut_now = std::time::Instant::now();
|
||||
let key = compute_sort_key(query_chunks.iter().map(|x| x.summary()));
|
||||
let key_str = format!("\"{}\"", key); // for logging
|
||||
|
||||
|
@ -108,7 +109,8 @@ pub(crate) fn compact_chunks(
|
|||
|
||||
info!(input_chunks=query_chunks.len(), rub_row_groups=rb_row_groups,
|
||||
input_rows=input_rows, output_rows=guard.table_summary().count(),
|
||||
sort_key=%key_str, compaction_took = ?elapsed, rows_per_sec=?throughput, "chunk(s) compacted");
|
||||
sort_key=%key_str, compaction_took = ?elapsed, fut_execution_duration= ?fut_now.elapsed(),
|
||||
rows_per_sec=?throughput, "chunk(s) compacted");
|
||||
|
||||
Ok(DbChunk::snapshot(&guard))
|
||||
};
|
||||
|
|
|
@ -10,6 +10,7 @@ use crate::{
|
|||
};
|
||||
|
||||
use super::scenario::{collect_query, create_readable_database, rand_name, DatabaseBuilder};
|
||||
use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, CompactChunks};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_chunk_is_persisted_automatically() {
|
||||
|
@ -53,21 +54,25 @@ async fn test_full_lifecycle() {
|
|||
let fixture = ServerFixture::create_shared().await;
|
||||
let mut write_client = fixture.write_client();
|
||||
|
||||
let db_name = rand_name();
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.persist(true)
|
||||
// wait 2 seconds for the data to arrive (to ensure we compact a single chunk)
|
||||
.persist_age_threshold_seconds(2)
|
||||
.late_arrive_window_seconds(1)
|
||||
.build(fixture.grpc_channel())
|
||||
.await;
|
||||
|
||||
// write in enough data to exceed the soft limit (512K) and
|
||||
// expect that it compacts, persists and then unloads the data from memory
|
||||
let num_payloads = 10;
|
||||
let num_duplicates = 2;
|
||||
let payload_size = 1_000;
|
||||
|
||||
let total_rows = num_payloads * num_duplicates * payload_size;
|
||||
|
||||
let db_name = rand_name();
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.persist(true)
|
||||
// Each write should go into a separate chunk to test compaction
|
||||
.mub_row_threshold(payload_size)
|
||||
// Only trigger persistence once we've finished writing
|
||||
.persist_row_threshold(total_rows)
|
||||
.persist_age_threshold_seconds(1000)
|
||||
// A low late arrival time to speed up the test
|
||||
.late_arrive_window_seconds(1)
|
||||
.build(fixture.grpc_channel())
|
||||
.await;
|
||||
|
||||
let payloads: Vec<_> = (0..num_payloads)
|
||||
.map(|x| {
|
||||
(0..payload_size)
|
||||
|
@ -76,24 +81,17 @@ async fn test_full_lifecycle() {
|
|||
})
|
||||
.collect();
|
||||
|
||||
for payload in payloads.iter().take(num_payloads - 1) {
|
||||
for payload in &payloads {
|
||||
// Writing the same data multiple times should be compacted away
|
||||
for _ in 0..num_duplicates {
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, payload)
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, payload_size);
|
||||
assert_eq!(num_lines_written, payload_size as usize);
|
||||
}
|
||||
}
|
||||
|
||||
// Don't duplicate last write as it is what crosses the persist row threshold
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, payloads.last().unwrap())
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, payload_size);
|
||||
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
|
@ -102,11 +100,27 @@ async fn test_full_lifecycle() {
|
|||
)
|
||||
.await;
|
||||
|
||||
// Expect compaction to have occurred
|
||||
let performed_compaction = fixture
|
||||
.operations_client()
|
||||
.list_operations()
|
||||
.await
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|operation| match operation.metadata().job {
|
||||
Some(Job::CompactChunks(CompactChunks {
|
||||
db_name: operation_db_name,
|
||||
..
|
||||
})) => operation_db_name == db_name,
|
||||
_ => false,
|
||||
});
|
||||
assert!(performed_compaction);
|
||||
|
||||
// Expect them to have been compacted into a single read buffer
|
||||
// with the duplicates eliminated
|
||||
let chunks = list_chunks(&fixture, &db_name).await;
|
||||
assert_eq!(chunks.len(), 1);
|
||||
assert_eq!(chunks[0].row_count, num_payloads * payload_size)
|
||||
assert_eq!(chunks[0].row_count, (num_payloads * payload_size) as usize)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -163,8 +177,6 @@ async fn test_query_chunk_after_restart() {
|
|||
|
||||
/// Create a closed read buffer chunk and return its id
|
||||
async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> u32 {
|
||||
use influxdb_iox_client::management::generated_types::operation_metadata::Job;
|
||||
|
||||
let mut management_client = fixture.management_client();
|
||||
let mut write_client = fixture.write_client();
|
||||
let mut operations_client = fixture.operations_client();
|
||||
|
|
|
@ -317,11 +317,21 @@ impl DatabaseBuilder {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn mub_row_threshold(mut self, threshold: u64) -> Self {
|
||||
self.lifecycle_rules.mub_row_threshold = threshold;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn persist_age_threshold_seconds(mut self, threshold: u32) -> Self {
|
||||
self.lifecycle_rules.persist_age_threshold_seconds = threshold;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn persist_row_threshold(mut self, threshold: u64) -> Self {
|
||||
self.lifecycle_rules.persist_row_threshold = threshold;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn late_arrive_window_seconds(mut self, late_arrive_window_seconds: u32) -> Self {
|
||||
self.lifecycle_rules.late_arrive_window_seconds = late_arrive_window_seconds;
|
||||
self
|
||||
|
|
Loading…
Reference in New Issue