test: make partioning in DB tests consistent w/ DB rules
parent
ed3ebdcbd2
commit
5ca9760c94
|
@ -1813,7 +1813,7 @@ pub mod test_helpers {
|
|||
/// Converts the line protocol to a collection of `Entry` with a single
|
||||
/// shard and a single partition, which is useful for testing when `lp` is
|
||||
/// large. Batches are sized according to LP_BATCH_SIZE.
|
||||
pub fn lp_to_entries(lp: &str) -> Vec<Entry> {
|
||||
pub fn lp_to_entries(lp: &str, partitioner: &impl Partitioner) -> Vec<Entry> {
|
||||
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
|
||||
|
||||
let default_time = Utc::now().timestamp_nanos();
|
||||
|
@ -1821,16 +1821,11 @@ pub mod test_helpers {
|
|||
lines
|
||||
.chunks(LP_BATCH_SIZE)
|
||||
.map(|batch| {
|
||||
lines_to_sharded_entries(
|
||||
batch,
|
||||
default_time,
|
||||
sharder(1).as_ref(),
|
||||
&hour_partitioner(),
|
||||
)
|
||||
.unwrap()
|
||||
.pop()
|
||||
.unwrap()
|
||||
.entry
|
||||
lines_to_sharded_entries(batch, default_time, sharder(1).as_ref(), partitioner)
|
||||
.unwrap()
|
||||
.pop()
|
||||
.unwrap()
|
||||
.entry
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
|
|
@ -795,7 +795,10 @@ pub mod test_helpers {
|
|||
|
||||
/// Try to write lineprotocol data and return all tables that where written.
|
||||
pub async fn try_write_lp(db: &Db, lp: &str) -> Result<Vec<String>> {
|
||||
let entries = lp_to_entries(lp);
|
||||
let entries = {
|
||||
let partitioner = &db.rules.read().partition_template;
|
||||
lp_to_entries(lp, partitioner)
|
||||
};
|
||||
|
||||
let mut tables = HashSet::new();
|
||||
for entry in entries {
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::{borrow::Cow, convert::TryFrom, num::NonZeroU64, sync::Arc, time::Durat
|
|||
|
||||
use data_types::{
|
||||
chunk_metadata::{ChunkStorage, ChunkSummary},
|
||||
database_rules::DatabaseRules,
|
||||
database_rules::{DatabaseRules, PartitionTemplate, TemplatePart},
|
||||
server_id::ServerId,
|
||||
DatabaseName,
|
||||
};
|
||||
|
@ -37,6 +37,7 @@ pub struct TestDbBuilder {
|
|||
worker_cleanup_avg_sleep: Option<Duration>,
|
||||
write_buffer: Option<Arc<dyn WriteBuffer>>,
|
||||
catalog_transactions_until_checkpoint: Option<NonZeroU64>,
|
||||
partition_template: Option<PartitionTemplate>,
|
||||
}
|
||||
|
||||
impl TestDbBuilder {
|
||||
|
@ -80,6 +81,16 @@ impl TestDbBuilder {
|
|||
rules.lifecycle_rules.catalog_transactions_until_checkpoint = v;
|
||||
}
|
||||
|
||||
// set partion template
|
||||
if let Some(partition_template) = self.partition_template {
|
||||
rules.partition_template = partition_template;
|
||||
} else {
|
||||
// default to hourly
|
||||
rules.partition_template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::TimeFormat("%Y-%m-%dT%H".to_string())],
|
||||
};
|
||||
}
|
||||
|
||||
let database_to_commit = DatabaseToCommit {
|
||||
rules,
|
||||
server_id,
|
||||
|
@ -125,6 +136,11 @@ impl TestDbBuilder {
|
|||
self.catalog_transactions_until_checkpoint = Some(interval);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn partition_template(mut self, template: PartitionTemplate) -> Self {
|
||||
self.partition_template = Some(template);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Used for testing: create a Database with a local store
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
|
||||
use entry::{test_helpers::lp_to_entries, Sequence};
|
||||
use entry::{
|
||||
test_helpers::{hour_partitioner, lp_to_entries},
|
||||
Sequence,
|
||||
};
|
||||
use flate2::read::GzDecoder;
|
||||
use mutable_buffer::chunk::{ChunkMetrics, MBChunk};
|
||||
use std::io::Read;
|
||||
|
@ -20,7 +23,7 @@ fn chunk(count: usize) -> MBChunk {
|
|||
|
||||
let sequence = Some(Sequence::new(1, 5));
|
||||
for _ in 0..count {
|
||||
for entry in lp_to_entries(&lp) {
|
||||
for entry in lp_to_entries(&lp, &hour_partitioner()) {
|
||||
for write in entry.partition_writes().iter().flatten() {
|
||||
for batch in write.table_batches() {
|
||||
chunk.write_table_batch(sequence.as_ref(), batch).unwrap();
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
|
||||
use entry::{test_helpers::lp_to_entries, Entry, Sequence};
|
||||
use entry::{
|
||||
test_helpers::{hour_partitioner, lp_to_entries},
|
||||
Entry, Sequence,
|
||||
};
|
||||
use flate2::read::GzDecoder;
|
||||
use mutable_buffer::chunk::{ChunkMetrics, MBChunk};
|
||||
use std::io::Read;
|
||||
|
@ -26,7 +29,7 @@ fn load_entries() -> Vec<Entry> {
|
|||
let mut gz = GzDecoder::new(&raw[..]);
|
||||
let mut lp = String::new();
|
||||
gz.read_to_string(&mut lp).unwrap();
|
||||
lp_to_entries(&lp)
|
||||
lp_to_entries(&lp, &hour_partitioner())
|
||||
}
|
||||
|
||||
pub fn write_mb(c: &mut Criterion) {
|
||||
|
|
Loading…
Reference in New Issue