feat: Remove the server_benchmarks crate (#4506)

pull/24376/head
Carol (Nichols || Goulding) 2022-05-02 14:20:01 -04:00 committed by GitHub
parent 48d2fe1396
commit e015d3bafb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 0 additions and 46438 deletions

View File

@ -35,7 +35,6 @@ workspace-members = [
"influxdb2_client",
"iox_data_generator",
"mutable_batch_tests",
"server_benchmarks",
]
third-party = [
{ name = "azure_core", git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "15de10cb58cf5cb271334a20e35fa9fbedd10244" },

23
Cargo.lock generated
View File

@ -5520,29 +5520,6 @@ dependencies = [
"write_buffer",
]
[[package]]
name = "server_benchmarks"
version = "0.1.0"
dependencies = [
"arrow_util",
"chrono",
"criterion",
"data_types",
"datafusion 0.1.0",
"db",
"flate2",
"influxdb_line_protocol",
"influxdb_tsm",
"mutable_buffer",
"object_store",
"predicate",
"query",
"query_tests",
"rand",
"test_helpers",
"tokio",
]
[[package]]
name = "service_common"
version = "0.1.0"

View File

@ -68,7 +68,6 @@ members = [
"router2",
"schema",
"server",
"server_benchmarks",
"service_common",
"service_grpc_influxrpc",
"service_grpc_flight",

View File

@ -23,6 +23,5 @@ pub mod sql;
#[cfg(test)]
pub mod table_schema;
// Used by the `server_benchmark` crate in addition to tests in this crate
pub mod db;
pub mod scenarios;

View File

@ -1,51 +0,0 @@
[package]
name = "server_benchmarks"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2021"
description = "Server related bechmarks, grouped into their own crate to minimize build dev build times"
[dependencies]
[dev-dependencies] # In alphabetical order
arrow_util = { path = "../arrow_util" }
criterion = { version = "0.3.4", features = ["async_tokio"] }
chrono = { version = "0.4", default-features = false }
datafusion = { path = "../datafusion" }
data_types = { path = "../data_types" }
db = { path = "../db" }
flate2 = "1.0.23"
influxdb_tsm = { path = "../influxdb_tsm" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
mutable_buffer = { path = "../mutable_buffer", features = ["nocache"] }
object_store = { path = "../object_store" }
predicate = { path = "../predicate" }
query = { path = "../query" }
query_tests = { path = "../query_tests" }
rand = "0.8.3"
test_helpers = { path = "../test_helpers" }
tokio = { version = "1.18", features = ["macros", "parking_lot"] }
[[bench]]
name = "influxrpc"
harness = false
[[bench]]
name = "snapshot"
harness = false
[[bench]]
name = "encoders"
harness = false
[[bench]]
name = "line_parser"
harness = false
[[bench]]
name = "mapper"
harness = false
[[bench]]
name = "catalog_persistence"
harness = false

View File

@ -1,172 +0,0 @@
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, SamplingMode};
use data_types::{chunk_metadata::ChunkId, database_rules::LifecycleRules};
use db::{test_helpers::write_lp, utils::TestDb};
use object_store::{DynObjectStore, ObjectStoreImpl, ThrottleConfig};
use query::QueryChunk;
use std::{
convert::TryFrom,
num::{NonZeroU32, NonZeroU64},
sync::Arc,
time::Duration,
};
use tokio::{
runtime::{Handle, Runtime},
sync::Mutex,
task::block_in_place,
};
/// Checkpoint interval for preserved catalog.
const CHECKPOINT_INTERVAL: u64 = 10;
/// Number of chunks simulated for persistence.
///
/// Ideally this value is NOT divisible by [`CHECKPOINT_INTERVAL`], so that there are some transactions after the last
/// checkpoint.
const N_CHUNKS: u32 = 109;
/// Number of tags for the test table.
const N_TAGS: usize = 10;
/// Number of fields for the test table.
const N_FIELDS: usize = 10;
/// Run all benchmarks that test catalog persistence.
fn benchmark_catalog_persistence(c: &mut Criterion) {
let object_store = create_throttled_store();
let setup_done = Mutex::new(None);
// benchmark reading the catalog
let mut group = c.benchmark_group("catalog");
group.measurement_time(Duration::from_secs(30));
group.sample_size(10);
group.sampling_mode(SamplingMode::Flat);
group.warm_up_time(Duration::from_secs(10));
group.bench_function("catalog_restore", |b| {
b.to_async(Runtime::new().unwrap()).iter_batched(
|| {
// Threaded runtime is already running.
block_in_place(|| {
Handle::current().block_on(setup(Arc::clone(&object_store), &setup_done))
})
},
|chunk_ids| {
let object_store = Arc::clone(&object_store);
async move {
let db = create_persisted_db(Arc::clone(&object_store)).await.db;
// test that data is actually loaded
let partition_key = "1970-01-01T00";
let table_name = "cpu";
let chunk_id = chunk_ids[0];
assert!(db
.table_summary(table_name, partition_key, chunk_id)
.is_some());
}
},
BatchSize::SmallInput,
);
});
group.finish();
}
/// Persist a database to the given object store with [`N_CHUNKS`] chunks.
async fn setup(
object_store: Arc<DynObjectStore>,
done: &Mutex<Option<Arc<Vec<ChunkId>>>>,
) -> Arc<Vec<ChunkId>> {
let mut guard = done.lock().await;
if let Some(chunk_ids) = guard.as_ref() {
return Arc::clone(chunk_ids);
}
let db = create_persisted_db(object_store).await.db;
let lp = create_lp(N_TAGS, N_FIELDS);
let partition_key = "1970-01-01T00";
let mut chunk_ids = vec![];
for _ in 0..N_CHUNKS {
let table_names = write_lp(&db, &lp);
for table_name in &table_names {
db.compact_open_chunk(table_name, partition_key)
.await
.unwrap();
let chunk = db
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap();
db.unload_read_buffer(table_name, partition_key, chunk.id())
.unwrap();
chunk_ids.push(chunk.id());
}
}
let chunk_ids = Arc::new(chunk_ids);
*guard = Some(Arc::clone(&chunk_ids));
chunk_ids
}
/// Create a persisted database and load its catalog.
#[inline(never)]
async fn create_persisted_db(object_store: Arc<DynObjectStore>) -> TestDb {
TestDb::builder()
.object_store(object_store)
.lifecycle_rules(LifecycleRules {
catalog_transactions_until_checkpoint: NonZeroU64::try_from(CHECKPOINT_INTERVAL)
.unwrap(),
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await
}
/// Create line protocol for a single entry with `n_tags` tags and `n_fields` fields.
///
/// The table is `"cpu"` and the timestamp is `0`.
fn create_lp(n_tags: usize, n_fields: usize) -> String {
let mut lp = "cpu".to_string();
for i in 0..n_tags {
lp.push_str(&format!(",tag_{}=x", i));
}
lp.push(' ');
for i in 0..n_fields {
if i > 0 {
lp.push(',')
}
lp.push_str(&format!("field_{}=1", i));
}
lp.push_str(" 0");
lp
}
/// Create object store with somewhat realistic operation latencies.
fn create_throttled_store() -> Arc<DynObjectStore> {
let config = ThrottleConfig {
// for every call: assume a 100ms latency
wait_delete_per_call: Duration::from_millis(100),
wait_get_per_call: Duration::from_millis(100),
wait_list_per_call: Duration::from_millis(100),
wait_list_with_delimiter_per_call: Duration::from_millis(100),
wait_put_per_call: Duration::from_millis(100),
// for list operations: assume we need 1 call per 1k entries at 100ms
wait_list_per_entry: Duration::from_millis(100) / 1_000,
wait_list_with_delimiter_per_entry: Duration::from_millis(100) / 1_000,
// for upload/download: assume 1GByte/s
wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000,
};
Arc::new(ObjectStoreImpl::new_in_memory_throttled(config))
}
criterion_group!(benches, benchmark_catalog_persistence);
criterion_main!(benches);

View File

@ -1,352 +0,0 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use rand::{distributions::Uniform, Rng};
use std::convert::TryFrom;
use std::mem;
mod fixtures;
const LARGER_BATCH_SIZES: [usize; 12] = [
10, 25, 50, 100, 250, 500, 750, 1_000, 5_000, 10_000, 50_000, 100_000,
];
const SMALLER_BATCH_SIZES: [usize; 11] =
[10, 25, 50, 100, 250, 500, 750, 1_000, 5_000, 10_000, 45_000];
type EncodeFn<T> = fn(src: &[T], dst: &mut Vec<u8>) -> Result<(), Box<dyn std::error::Error>>;
type DecodeFn<T> = fn(src: &[u8], dst: &mut Vec<T>) -> Result<(), Box<dyn std::error::Error>>;
fn benchmark_encode_sequential<T: From<i32>>(
c: &mut Criterion,
benchmark_group_name: &str,
batch_sizes: &[usize],
encode: EncodeFn<T>,
) {
benchmark_encode(
c,
benchmark_group_name,
batch_sizes,
|batch_size| (1..batch_size).map(convert_from_usize).collect(),
encode,
);
}
fn benchmark_encode<T>(
c: &mut Criterion,
benchmark_group_name: &str,
batch_sizes: &[usize],
decoded_value_generation: fn(batch_size: usize) -> Vec<T>,
encode: EncodeFn<T>,
) {
let mut group = c.benchmark_group(benchmark_group_name);
for &batch_size in batch_sizes {
group.throughput(Throughput::Bytes(
u64::try_from(batch_size * mem::size_of::<T>()).unwrap(),
));
group.bench_with_input(
BenchmarkId::from_parameter(batch_size),
&batch_size,
|b, &batch_size| {
let decoded = decoded_value_generation(batch_size);
let mut encoded = vec![];
b.iter(|| {
encode(&decoded, &mut encoded).unwrap();
});
},
);
}
group.finish();
}
fn benchmark_decode<T>(
c: &mut Criterion,
benchmark_group_name: &str,
batch_sizes: &[usize],
input_value_generation: fn(batch_size: usize) -> (usize, Vec<u8>),
decode: DecodeFn<T>,
) {
let mut group = c.benchmark_group(benchmark_group_name);
for &batch_size in batch_sizes {
let (decoded_len, encoded) = input_value_generation(batch_size);
group.throughput(Throughput::Bytes(u64::try_from(encoded.len()).unwrap()));
group.bench_with_input(
BenchmarkId::from_parameter(batch_size),
&decoded_len,
|b, &decoded_len| {
let mut decoded_mut = Vec::with_capacity(decoded_len);
b.iter(|| {
decoded_mut.clear();
decode(&encoded, &mut decoded_mut).unwrap();
});
},
);
}
group.finish();
}
fn convert_from_usize<T: From<i32>>(a: usize) -> T {
i32::try_from(a).unwrap().into()
}
// The current float encoder produces the following compression:
//
// values block size compression
// 10 33 26.4 bits/value
// 25 52 16.64 bits/value
// 50 78 12.48 bits/value
// 100 129 10.32 bits/value
// 250 290 9.28 bits/value
// 500 584 9.34 bits/value
// 750 878 9.36 bits/value
// 1000 1221 9.76 bits/value
// 5000 7013 11.22 bits/value
// 10000 15145 12.11 bits/value
// 50000 90090 14.41 bits/value
// 100000 192481 15.39 bits/value
//
fn float_encode_sequential(c: &mut Criterion) {
benchmark_encode_sequential(
c,
"float_encode_sequential",
&LARGER_BATCH_SIZES,
influxdb_tsm::encoders::float::encode,
);
}
// The current integer encoder produces the following compression. Note, since
// a sequential range of values can be encoded using RLE the compression
// statistics are not very interesting.
//
// values block size compression
// 10 11 8.80 bits/value
// 25 11 3.52 bits/value
// 50 11 1.76 bits/value
// 100 11 0.88 bits/value
// 250 12 0.38 bits/value
// 500 12 0.19 bits/value
// 750 12 0.12 bits/value
// 1000 12 0.09 bits/value
// 5000 12 0.01 bits/value
// 10000 12 0.00 bits/value
// 50000 13 0.00 bits/value
// 100000 13 0.00 bits/value
//
fn integer_encode_sequential(c: &mut Criterion) {
benchmark_encode_sequential(
c,
"integer_encode_sequential",
&LARGER_BATCH_SIZES,
influxdb_tsm::encoders::integer::encode,
);
}
fn timestamp_encode_sequential(c: &mut Criterion) {
benchmark_encode_sequential(
c,
"timestamp_encode_sequential",
&LARGER_BATCH_SIZES,
influxdb_tsm::encoders::timestamp::encode,
);
}
// The current float encoder produces the following compression:
//
// values block size compression
// 10 32 25.6 bits/value
// 25 76 24.32 bits/value
// 50 86 13.76 bits/value
// 100 167 13.36 bits/value
// 250 388 12.41 bits/value
// 500 1165 18.64 bits/value
// 750 1769 18.86 bits/value
// 1000 2366 18.92 bits/value
// 5000 11785 18.85 bits/value
// 10000 23559 18.84 bits/value
// 50000 117572 18.81 bits/value
// 100000 235166 18.81 bits/value
//
fn float_encode_random(c: &mut Criterion) {
benchmark_encode(
c,
"float_encode_random",
&LARGER_BATCH_SIZES,
|batch_size| {
let range = Uniform::from(0.0..100.0);
rand::thread_rng()
.sample_iter(&range)
.take(batch_size)
.collect()
},
influxdb_tsm::encoders::float::encode,
)
}
// The current integer encoder produces the following compression:
//
// values block size compression
// 10 25 20.00 bits/value
// 25 33 10.56 bits/value
// 50 65 10.40 bits/value
// 100 121 9.68 bits/value
// 250 281 8.99 bits/value
// 500 561 8.97 bits/value
// 750 833 8.88 bits/value
// 1000 1105 8.84 bits/value
// 5000 5425 8.68 bits/value
// 10000 10865 8.69 bits/value
// 50000 54361 8.69 bits/value
// 100000 108569 8.68 bits/value
//
fn integer_encode_random(c: &mut Criterion) {
benchmark_encode(
c,
"integer_encode_random",
&LARGER_BATCH_SIZES,
|batch_size| {
(1..batch_size)
.map(|_| rand::thread_rng().gen_range(0..100))
.collect()
},
influxdb_tsm::encoders::integer::encode,
)
}
// The current float encoder produces the following compression:
//
// values block size compression
// 10 91 72.8 bits/value
// 25 208 66.56 bits/value
// 50 411 65.76 bits/value
// 100 809 64.72 bits/value
// 250 2028 64.89 bits/value
// 500 4059 64.94 bits/value
// 750 6091 64.97 bits/value
// 1000 8122 64.97 bits/value
// 5000 40614 64.98 bits/value
// 10000 81223 64.97 bits/value
// 45000 365470 64.97 bits/value
//
fn float_encode_cpu(c: &mut Criterion) {
benchmark_encode(
c,
"float_encode_cpu",
&SMALLER_BATCH_SIZES,
|batch_size| fixtures::CPU_F64_EXAMPLE_VALUES[..batch_size].to_vec(),
influxdb_tsm::encoders::float::encode,
)
}
fn float_decode_cpu(c: &mut Criterion) {
benchmark_decode(
c,
"float_decode_cpu",
&SMALLER_BATCH_SIZES,
|batch_size| {
let decoded: Vec<f64> = fixtures::CPU_F64_EXAMPLE_VALUES[..batch_size].to_vec();
let mut encoded = vec![];
influxdb_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
influxdb_tsm::encoders::float::decode,
)
}
fn float_decode_sequential(c: &mut Criterion) {
benchmark_decode(
c,
"float_decode_sequential",
&LARGER_BATCH_SIZES,
|batch_size| {
let decoded: Vec<f64> = (1..batch_size).map(convert_from_usize).collect();
let mut encoded = vec![];
influxdb_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
influxdb_tsm::encoders::float::decode,
)
}
fn integer_decode_sequential(c: &mut Criterion) {
benchmark_decode(
c,
"integer_decode_sequential",
&LARGER_BATCH_SIZES,
|batch_size| {
let decoded: Vec<i64> = (1..batch_size).map(convert_from_usize).collect();
let mut encoded = vec![];
influxdb_tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
influxdb_tsm::encoders::integer::decode,
)
}
fn timestamp_decode_sequential(c: &mut Criterion) {
benchmark_decode(
c,
"timestamp_decode_sequential",
&LARGER_BATCH_SIZES,
|batch_size| {
let decoded: Vec<i64> = (1..batch_size).map(convert_from_usize).collect();
let mut encoded = vec![];
influxdb_tsm::encoders::timestamp::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
influxdb_tsm::encoders::timestamp::decode,
)
}
fn float_decode_random(c: &mut Criterion) {
benchmark_decode(
c,
"float_decode_random",
&LARGER_BATCH_SIZES,
|batch_size| {
let range = Uniform::from(0.0..100.0);
let decoded: Vec<_> = rand::thread_rng()
.sample_iter(&range)
.take(batch_size)
.collect();
let mut encoded = vec![];
influxdb_tsm::encoders::float::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
influxdb_tsm::encoders::float::decode,
)
}
fn integer_decode_random(c: &mut Criterion) {
benchmark_decode(
c,
"integer_decode_random",
&LARGER_BATCH_SIZES,
|batch_size| {
let decoded: Vec<i64> = (1..batch_size)
.map(|_| rand::thread_rng().gen_range(0..100))
.collect();
let mut encoded = vec![];
influxdb_tsm::encoders::integer::encode(&decoded, &mut encoded).unwrap();
(decoded.len(), encoded)
},
influxdb_tsm::encoders::integer::decode,
)
}
criterion_group!(
benches,
float_encode_sequential,
integer_encode_sequential,
timestamp_encode_sequential,
float_encode_random,
integer_encode_random,
float_encode_cpu,
float_decode_cpu,
float_decode_sequential,
integer_decode_sequential,
timestamp_decode_sequential,
float_decode_random,
integer_decode_random,
);
criterion_main!(benches);

File diff suppressed because it is too large Load Diff

View File

@ -1,17 +0,0 @@
mod read_filter;
mod read_group;
mod tag_values;
use criterion::{criterion_group, criterion_main};
use read_filter::benchmark_read_filter;
use read_group::benchmark_read_group;
use tag_values::benchmark_tag_values;
criterion_group!(
benches,
benchmark_tag_values,
benchmark_read_filter,
benchmark_read_group,
);
criterion_main!(benches);

View File

@ -1,28 +0,0 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use std::time::Duration;
static LINES: &str = include_str!("../../test_fixtures/lineproto/prometheus.lp");
fn line_parser(c: &mut Criterion) {
let mut group = c.benchmark_group("line_parser");
// group.throughput(Throughput::Elements(LINES.lines().count() as u64));
group.throughput(Throughput::Bytes(LINES.len() as u64));
group.measurement_time(Duration::from_secs(30));
group.bench_function("all lines", |b| {
b.iter(|| {
let lines = influxdb_line_protocol::parse_lines(LINES)
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(554, lines.len());
})
});
group.finish();
}
criterion_group!(benches, line_parser);
criterion_main!(benches);

View File

@ -1,101 +0,0 @@
use criterion::{criterion_group, criterion_main, Criterion};
use influxdb_tsm::mapper::*;
use influxdb_tsm::reader::*;
use influxdb_tsm::*;
use std::collections::BTreeMap;
fn map_field_columns(c: &mut Criterion) {
let mut group = c.benchmark_group("mapper");
let mut measurement_table = mapper::MeasurementTable::new("cpu".to_string(), 0);
measurement_table
.add_series_data(
vec![],
"temp".to_string(),
Block {
min_time: 0,
max_time: 0,
offset: 0,
size: 0,
typ: influxdb_tsm::BlockType::Float,
reader_idx: 0,
},
)
.unwrap();
measurement_table
.add_series_data(
vec![],
"temp".to_string(),
Block {
min_time: 1,
max_time: 0,
offset: 0,
size: 0,
typ: influxdb_tsm::BlockType::Float,
reader_idx: 0,
},
)
.unwrap();
measurement_table
.add_series_data(
vec![],
"voltage".to_string(),
Block {
min_time: 2,
max_time: 0,
offset: 0,
size: 0,
typ: influxdb_tsm::BlockType::Integer,
reader_idx: 0,
},
)
.unwrap();
// setup mock block decoder
let block0 = BlockData::Float {
i: 0,
values: vec![100.0; 1000],
ts: (0..1000).collect(),
};
let block1 = BlockData::Float {
i: 0,
values: vec![200.0; 500],
ts: (1000..1500).collect(),
};
let block2 = BlockData::Integer {
i: 0,
values: vec![22; 800],
ts: (1000..1800).collect(),
};
let mut block_map = BTreeMap::new();
block_map.insert(0, block0);
block_map.insert(1, block1);
block_map.insert(2, block2);
let decoder = reader::MockBlockDecoder::new(block_map);
group.bench_function("map_field_columns", move |b| {
b.iter_batched(
|| (decoder.clone(), measurement_table.clone()),
|(mut data, mut measurement_table)| {
measurement_table
.process(&mut data, |section: TableSection| -> Result<(), TsmError> {
assert_eq!(section.len(), 1800);
Ok(())
})
.unwrap();
},
criterion::BatchSize::LargeInput,
)
});
group.finish();
}
criterion_group!(benches, map_field_columns);
criterion_main!(benches);

View File

@ -1,133 +0,0 @@
use criterion::{BenchmarkId, Criterion};
use datafusion::logical_plan::{col, lit};
use std::{io::Read, sync::Arc};
// This is a struct that tells Criterion.rs to use the "futures" crate's
// current-thread executor
use db::Db;
use flate2::read::GzDecoder;
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::{
exec::{Executor, ExecutorType},
frontend::influxrpc::InfluxRpcPlanner,
};
use query_tests::scenarios::DbScenario;
use tokio::runtime::Runtime;
// Uses the `query_tests` module to generate some chunk scenarios, specifically
// the scenarios where there are:
//
// - a single open mutable buffer chunk;
// - a closed mutable buffer chunk and another open one;
// - an open mutable buffer chunk and a closed read buffer chunk;
// - two closed read buffer chunks.
//
// The chunks are all fed the *same* line protocol, so these benchmarks are
// useful for assessing the differences in performance between querying the
// chunks held in different execution engines.
//
// These benchmarks use a synthetically generated set of line protocol using
// `inch`. Each point is a new series containing 10 tag keys, which results in
// ten columns in IOx. There is a single field column and a timestamp column.
//
// - tag0, cardinality 2.
// - tag1, cardinality 10.
// - tag2, cardinality 10.
// - tag3, cardinality 50.
// - tag4, cardinality 100.
//
// In total there are 10K rows. The timespan of the points in the line
// protocol is around 1m of wall-clock time.
async fn setup_scenarios() -> Vec<DbScenario> {
let raw = include_bytes!("../../test_fixtures/lineproto/read_filter.lp.gz");
let mut gz = GzDecoder::new(&raw[..]);
let mut lp = String::new();
gz.read_to_string(&mut lp).unwrap();
let db =
query_tests::scenarios::util::make_two_chunk_scenarios("2021-04-26T13", &lp, &lp).await;
db
}
// Run all benchmarks for `read_filter`.
pub fn benchmark_read_filter(c: &mut Criterion) {
let scenarios = Runtime::new().unwrap().block_on(setup_scenarios());
execute_benchmark_group(c, scenarios.as_slice());
}
// Runs an async criterion benchmark against the provided scenarios and
// predicate.
fn execute_benchmark_group(c: &mut Criterion, scenarios: &[DbScenario]) {
let planner = InfluxRpcPlanner::default();
let predicates = vec![
(InfluxRpcPredicate::default(), "no_pred"),
(
InfluxRpcPredicate::new(
None,
PredicateBuilder::default()
.add_expr(col("tag3").eq(lit("value49")))
.build(),
),
"with_pred_tag_3=value49",
),
];
for scenario in scenarios {
let DbScenario { scenario_name, db } = scenario;
let mut group = c.benchmark_group(format!("read_filter/{}", scenario_name));
// downcast Db for performance
let db = Arc::downcast::<Db>(Arc::clone(db).as_any_arc()).unwrap();
for (predicate, pred_name) in &predicates {
let chunks = db
.filtered_chunk_summaries(None, Some("2021-04-26T13"))
.len();
// The number of expected frames, based on the expected number of
// individual series keys.
let exp_data_frames = if predicate.is_empty() { 10000 } else { 200 } * chunks;
group.bench_with_input(
BenchmarkId::from_parameter(pred_name),
predicate,
|b, predicate| {
let executor = db.executor();
b.to_async(Runtime::new().unwrap()).iter(|| {
build_and_execute_plan(
&planner,
executor.as_ref(),
&db,
predicate.clone(),
exp_data_frames,
)
});
},
);
}
group.finish();
}
}
// Plans and runs a tag_values query.
async fn build_and_execute_plan(
planner: &InfluxRpcPlanner,
executor: &Executor,
db: &Db,
predicate: InfluxRpcPredicate,
exp_data_frames: usize,
) {
let plan = planner
.read_filter(db, predicate)
.await
.expect("built plan successfully");
let results = executor
.new_context(ExecutorType::Query)
.to_series_and_groups(plan)
.await
.expect("Running series set plan");
assert_eq!(results.len(), exp_data_frames);
}

View File

@ -1,140 +0,0 @@
use criterion::{BenchmarkId, Criterion};
use datafusion::logical_plan::{col, lit};
use std::{io::Read, sync::Arc};
// This is a struct that tells Criterion.rs to use the "futures" crate's
// current-thread executor
use db::Db;
use flate2::read::GzDecoder;
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::{
exec::{Executor, ExecutorType},
frontend::influxrpc::InfluxRpcPlanner,
Aggregate,
};
use query_tests::scenarios::DbScenario;
use tokio::runtime::Runtime;
// Uses the `query_tests` module to generate some chunk scenarios, specifically
// the scenarios where there are:
//
// - a single open mutable buffer chunk;
// - a closed mutable buffer chunk and another open one;
// - an open mutable buffer chunk and a closed read buffer chunk;
// - two closed read buffer chunks.
//
// The chunks are all fed the *same* line protocol, so these benchmarks are
// useful for assessing the differences in performance between querying the
// chunks held in different execution engines.
//
// These benchmarks use a synthetically generated set of line protocol using
// `inch`. Each point is a new series containing 5 tag keys, which results in
// five tag columns in IOx. There is a single field column and a timestamp column.
//
// - tag0, cardinality 2.
// - tag1, cardinality 10.
// - tag2, cardinality 10.
// - tag3, cardinality 50.
// - tag4, cardinality 1.
//
// In total there are 10K rows. The timespan of the points in the line
// protocol is around 1m of wall-clock time.
async fn setup_scenarios() -> Vec<DbScenario> {
let raw = include_bytes!("../../test_fixtures/lineproto/read_filter.lp.gz");
let mut gz = GzDecoder::new(&raw[..]);
let mut lp = String::new();
gz.read_to_string(&mut lp).unwrap();
let db =
query_tests::scenarios::util::make_two_chunk_scenarios("2021-04-26T13", &lp, &lp).await;
db
}
// Run all benchmarks for `read_group`.
pub fn benchmark_read_group(c: &mut Criterion) {
let scenarios = Runtime::new().unwrap().block_on(setup_scenarios());
execute_benchmark_group(c, scenarios.as_slice());
}
// Runs an async criterion benchmark against the provided scenarios and
// predicate.
fn execute_benchmark_group(c: &mut Criterion, scenarios: &[DbScenario]) {
let planner = InfluxRpcPlanner::default();
let predicates = vec![
(InfluxRpcPredicate::default(), "no_pred"),
(
InfluxRpcPredicate::new(
None,
PredicateBuilder::default()
.add_expr(col("tag3").eq(lit("value49")))
.build(),
),
"with_pred_tag_3=value49",
),
];
for scenario in scenarios {
let DbScenario { scenario_name, db } = scenario;
let mut group = c.benchmark_group(format!("read_group/{}", scenario_name));
// downcast Db for performance
let db = Arc::downcast::<Db>(Arc::clone(db).as_any_arc()).unwrap();
for (predicate, pred_name) in &predicates {
// The number of expected frames, based on the expected number of
// individual series keys, which for grouping is the same no matter
// how many chunks we query over.
let exp_data_frames = if predicate.is_empty() {
10000 + 10 // 10 groups when grouping on `tag2`
} else {
200 + 10 // 10 groups when grouping on `tag2`
};
group.bench_with_input(
BenchmarkId::from_parameter(pred_name),
predicate,
|b, predicate| {
let executor = db.executor();
b.to_async(Runtime::new().unwrap()).iter(|| {
build_and_execute_plan(
&planner,
executor.as_ref(),
&db,
predicate.clone(),
Aggregate::Sum,
&["tag2"],
exp_data_frames,
)
});
},
);
}
group.finish();
}
}
// Plans and runs a tag_values query.
async fn build_and_execute_plan(
planner: &InfluxRpcPlanner,
executor: &Executor,
db: &Db,
predicate: InfluxRpcPredicate,
agg: Aggregate,
group: &[&str],
exp_frames: usize,
) {
let plan = planner
.read_group(db, predicate, agg, group)
.await
.expect("built plan successfully");
let results = executor
.new_context(ExecutorType::Query)
.to_series_and_groups(plan)
.await
.expect("Running series set plan");
assert_eq!(results.len(), exp_frames);
}

View File

@ -1,73 +0,0 @@
#!/usr/bin/env bash
#
# This script takes a git sha as input, checks out and runs cargo
# benchmarks at that sha, and appends the results, as line protocol to
# iox_bench.lp
#
# Example
# ./run_bench.sh /Users/alamb/Software/influxdb_iox2 a6ed8d59dc0465ff3a605b1fd4783faa9edb8560
#
# A list of commits can be found via
# git rev-list main
#
# So to run this thing on the last 10 commits, you can use a command such as
# git rev-list main | head | sed -e 's|^|./run_bench.sh /Users/alamb/Software/influxdb_iox2 |'
read -r -d '' USAGE << EOF
Usage:
$0 <source directory> gitsha
Example:
$0 /Users/alamb/Software/influxdb_iox2 a6ed8d59dc0465ff3a605b1fd4783faa9edb8560
Log is written to stdout
Performance results are appended to iox_bench.lp
EOF
# Location of this script
# https://stackoverflow.com/questions/59895/how-to-get-the-source-directory-of-a-bash-script-from-within-the-script-itself
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
SOURCE_DIR=$1
if [ -z "$SOURCE_DIR" ] ; then
echo "Error: no source directory specified"
echo "$USAGE"
exit 1
fi
if [ ! -d "$SOURCE_DIR" ] ; then
echo "Error: Not a directory: $SOURCE_DIR"
echo "$USAGE"
exit 1
fi
GITSHA=$2
if [ -z "$GITSHA" ] ; then
echo "Error: no gitsha specified"
echo "$USAGE"
exit 1
fi
RESULTS_FILE=$SCRIPT_DIR/iox_bench.lp
echo "**************************"
echo "InfluxDB IOx Benchmark Tool"
echo "**************************"
echo "Run starting at $(date)"
echo "Running on host $(hostname)"
echo "Source Directory: $SOURCE_DIR"
echo "Git SHA: $GITSHA"
echo "Results file: $RESULTS_FILE"
pushd $SOURCE_DIR
set -x
git checkout $GITSHA || (echo "Error checking out code. Aborting." && exit 2)
rm -rf target/criterion # remove any old results
# Debugging tip: use a command like this to run a subset of the benchmarks:
#cargo bench -- float_encode_sequential/100000 || (echo "Error running benchmarks. Aborting." && exit 3)
cargo bench || (echo "Error running benchmarks. Aborting." && exit 3)
# now, run the scraper and append results
$SCRIPT_DIR/scrape_benches.sh $SOURCE_DIR | tee -a $RESULTS_FILE
set +x
popd

View File

@ -1,106 +0,0 @@
#!/bin/bash
#
# Scrapes the most recent run of all criterion benchmark results into lineprotocol for analysis
#
# Generate data:
# (cd ~/Codes/influxdb_iox && cargo bench)
#
# Scrape data:
# ./scrape_benches.sh /Users/alamb/Software/influxdb_iox2
#
# To load to
function main {
SOURCE_DIR=$1
if [ -z "$SOURCE_DIR" ] ; then
echo "Error: influxdb_iox source directory not specified"
echo "Usage: $0 <influxdb_iox_dir>"
exit 1
fi
GITSHA=`cd $SOURCE_DIR && git rev-parse HEAD`
# pick timestamp of the commit
TIMESTAMP=$((`git show -s --format="%ct" $GITSHA` * 1000000000)) # ct means seconds since epoch
# note can use this to use the time this script ran
# TIMESTAMP=$((`date +%s` * 1000000000)) # +%s means seconds since epoch
# Criterion produces files named like this:
#
# target/criterion/float_encode_sequential/10000/new/estimates.json
# target/criterion/float_encode_sequential/10000/base/estimates.json
# target/criterion/float_encode_sequential/100000/change/estimates.json
# target/criterion/float_encode_sequential/100000/sample1/estimates.json
# target/criterion/float_encode_sequential/100000/new/estimates.json
# target/criterion/float_encode_sequential/100000/base/estimates.json
#
# The new/estimates.json are the ones from the most recent run
find "$SOURCE_DIR/target/criterion" -name 'estimates.json' | grep 'new/estimates.json'| while read estimates_file ;
do
process_file "$estimates_file"
done
}
# Processes a criterion results file and produces line protocol out
#
# Input:
# {
# "mean":{
# "confidence_interval":{"confidence_level":0.95,"lower_bound":92384.98456288037,"upper_bound":94127.8605349043},
# "point_estimate":93193.31282952648,
# "standard_error":444.9439871182596
# },
# "median":{
# "confidence_interval":{"confidence_level":0.95,"lower_bound":91137.96363636364,"upper_bound":92769.5854020979},
# "point_estimate":91426.08165568294,
# "standard_error":505.4331525578268
# },
# "median_abs_dev": .. (same structure )
# "slope": .. (same structure )
# "std_dev": .. (same structure )
# }
#
# Output: (line protocol)
#
# bench,gitsha=<gitsha>,hostname=trogdor,group_name=float_encode_sequential,bench_name=10000 mean=93193.31282952648,mean_standard_error=444.9439871182596,median=91426.08165568294,median_standard_error=505.4331525578268
function process_file {
estimates_file=$1
#echo "processing $estimates_file"
# example estimates_file:
# /path/target/criterion/float_encode_sequential/10000/new/estimates.json
# find the benchmark name (encoded as a filename)
[[ $estimates_file =~ ^.*target/criterion/(.*)/new/estimates.json$ ]] && dirname=${BASH_REMATCH[1]}
# echo $dirname
# float_encode_sequential/10000
#echo "dirname: $dirname"
# split on `/`
# https://stackoverflow.com/questions/918886/how-do-i-split-a-string-on-a-delimiter-in-bash)
IFS=/ read -a fields <<<"$dirname"
#echo "fields[0]: ${fields[0]}"
#echo "fields[1]: ${fields[1]}"
# fields[0]: float_encode_sequential
# fields[1]: 10000
# some benchmark names have spaces in them (thumbs down, so replace them with _)
group_name=${fields[0]/ /_}
bench_name=${fields[1]/ /_}
hostname=`hostname`
echo -n "bench,gitsha=$GITSHA,hostname=${hostname},group_name=$group_name,bench_name=$bench_name "
# use the jq command to pull out the various fields
echo -n `jq -j '"mean=" + (.mean.point_estimate | tostring), ",mean_standard_error=" + (.mean.standard_error | tostring), ",median=" + (.median.point_estimate | tostring), ",median_standard_error=" + (.median.standard_error | tostring)' "$estimates_file"`
echo -n " $TIMESTAMP"
echo
}
main $*

View File

@ -1,38 +0,0 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use flate2::read::GzDecoder;
use mutable_buffer::MBChunk;
use std::io::Read;
#[inline]
fn snapshot_chunk(chunk: &MBChunk) {
let _ = chunk.snapshot();
}
fn chunk(count: usize) -> MBChunk {
let raw = include_bytes!("../../test_fixtures/lineproto/tag_values.lp.gz");
let mut gz = GzDecoder::new(&raw[..]);
let mut lp = String::new();
gz.read_to_string(&mut lp).unwrap();
let mut chunk = mutable_buffer::test_helpers::write_lp_to_new_chunk(&lp);
for _ in 1..count {
mutable_buffer::test_helpers::write_lp_to_chunk(&lp, &mut chunk);
}
chunk
}
pub fn snapshot_mb(c: &mut Criterion) {
let mut group = c.benchmark_group("snapshot_mb");
for count in &[1, 2, 3, 4, 5] {
let chunk = chunk(*count as _);
group.bench_function(BenchmarkId::from_parameter(count), |b| {
b.iter(|| snapshot_chunk(&chunk));
});
}
group.finish();
}
criterion_group!(benches, snapshot_mb);
criterion_main!(benches);

View File

@ -1,65 +0,0 @@
#!/bin/bash
# This script was based on the benchmark script in
# https://github.com/influxdata/idpe/blob/b304e751e1/cmd/storectl/benchmark/bench.sh
# TODO: Better management of relationship of this checkout to the idpe checkout
idpe_checkout_dir=~/Go/idpe
org_id=0000111100001111
bucket_id=1111000011110000
tmp_dir=$(mktemp -d -t bench)
mkdir -p $tmp_dir/data/0
mkdir -p $tmp_dir/data/object/0
echo "Working in temporary directory $tmp_dir."
echo "Run:"
echo " rm -rf $tmp_dir"
echo "to clean up in the end."
echo "Building binaries..."
cargo build --release && \
# TODO: Teach storectl to generate line protocol instead of using inch?
cd $idpe_checkout_dir/cmd/inch && \
go build -o $tmp_dir/bin/inch . && \
cd - > /dev/null && \
# TODO: Figure out how to build storectl from outside the idpe checkout
cd $idpe_checkout_dir/cmd/storectl/benchmark && \
go build -o $tmp_dir/bin/storectl ../../storectl && \
cd - > /dev/null
if [[ $? -ne 0 ]]; then
exit 1
fi
# Once IOx can ingest what `storectl generate` creates, this section will be needed.
# cat > $tmp_dir/data.toml << EOL
# title = "CLI schema"
#
# [[measurements]]
# name = "m0"
# sample = 1.0
# tags = [
# { name = "tag0", source = { type = "sequence", format = "value%s", start = 0, count = 30 } },
# { name = "tag1", source = { type = "sequence", format = "value%s", start = 0, count = 20 } },
# { name = "tag2", source = { type = "sequence", format = "value%s", start = 0, count = 10 } },
# ]
# fields = [
# { name = "v0", count = 10, source = 1.0 },
# ]
# EOL
#
# $tmp_dir/bin/storectl generate --base-dir $tmp_dir/data --org-id=$org_id --bucket-id=$bucket_id $tmp_dir/data.toml --clean=none
sess=influxdb-iox-rpc-bench
tmux new-session -t $sess -d
tmux rename-window -t $sess 'bench'
tmux send-keys "./target/release/influxdb_iox" 'C-m'
tmux split-window -t $sess -v
tmux send-keys "sleep 5; curl 'http://localhost:8080/api/v2/create_bucket' -d org=$org_id -d bucket=$bucket_id; $tmp_dir/bin/inch -bucket $bucket_id -org $org_id -host http://localhost:8080" 'C-m'
tmux select-pane -t $sess -R
tmux split-window -t $sess -v
tmux send-keys "$tmp_dir/bin/storectl query -b $bucket_id -o $org_id --silent -c 1 --csv-out --expr \"tag0='value0'\""
tmux attach -t $sess

View File

@ -1,132 +0,0 @@
use criterion::{BenchmarkId, Criterion};
use datafusion::logical_plan::{col, lit};
use std::{io::Read, sync::Arc};
// This is a struct that tells Criterion.rs to use the "futures" crate's
// current-thread executor
use db::Db;
use flate2::read::GzDecoder;
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::{
exec::{Executor, ExecutorType},
frontend::influxrpc::InfluxRpcPlanner,
};
use query_tests::scenarios::DbScenario;
use tokio::runtime::Runtime;
// Uses the `query_tests` module to generate some chunk scenarios, specifically
// the scenarios where there are:
//
// - a single open mutable buffer chunk;
// - a closed mutable buffer chunk and another open one;
// - an open mutable buffer chunk and a closed read buffer chunk;
// - two closed read buffer chunks.
//
// The chunks are all fed the *same* line protocol, so these benchmarks are
// useful for assessig the differences in performance between querying the
// chunks held in different execution engines.
//
// These benchmarks use a synthetically generated set of line protocol using
// `inch`. Each point is a new series containing three tag keys. Those tag keys
// are:
//
// - tag0, cardinality 10.
// - tag1, cardinality 100.
// - tag2, cardinality 1,000.
//
// The timespan of the points in the line protocol is around 1m or wall-clock
// time.
async fn setup_scenarios() -> Vec<DbScenario> {
let raw = include_bytes!("../../test_fixtures/lineproto/tag_values.lp.gz");
let mut gz = GzDecoder::new(&raw[..]);
let mut lp = String::new();
gz.read_to_string(&mut lp).unwrap();
let db =
query_tests::scenarios::util::make_two_chunk_scenarios("2021-04-12T17", &lp, &lp).await;
db
}
// Run all benchmarks for `tag_values`.
pub fn benchmark_tag_values(c: &mut Criterion) {
let scenarios = Runtime::new().unwrap().block_on(setup_scenarios());
execute_benchmark_group(c, scenarios.as_slice());
}
// Runs an async criterion benchmark against the provided scenarios and
// predicate.
fn execute_benchmark_group(c: &mut Criterion, scenarios: &[DbScenario]) {
let planner = InfluxRpcPlanner::default();
let predicates = vec![
(InfluxRpcPredicate::default(), "no_pred"),
(
InfluxRpcPredicate::new(
None,
PredicateBuilder::default()
.add_expr(col("tag2").eq(lit("value321")))
.build(),
),
"with_pred",
),
];
// these tags have different cardinalities: 10, 100, 1000.
let tag_keys = &["tag0", "tag1", "tag2"];
for scenario in scenarios {
let DbScenario { scenario_name, db } = scenario;
let mut group = c.benchmark_group(scenario_name);
// downcast Db for performance
let db = Arc::downcast::<Db>(Arc::clone(db).as_any_arc()).unwrap();
for (predicate, pred_name) in &predicates {
for tag_key in tag_keys {
group.bench_with_input(
BenchmarkId::from_parameter(format!("{}/{}", tag_key, pred_name)),
tag_key,
|b, &tag_key| {
let executor = db.executor();
b.to_async(Runtime::new().unwrap()).iter(|| {
run_tag_values_query(
&planner,
executor.as_ref(),
&db,
tag_key,
predicate.clone(),
)
});
},
);
}
}
group.finish();
}
}
// Plans and runs a tag_values query.
async fn run_tag_values_query(
planner: &InfluxRpcPlanner,
executor: &Executor,
db: &Db,
tag_key: &str,
predicate: InfluxRpcPredicate,
) {
let plan = planner
.tag_values(db, tag_key, predicate)
.await
.expect("built plan successfully");
let names = executor
.new_context(ExecutorType::Query)
.to_string_set(plan)
.await
.expect(
"converted plan to strings
successfully",
);
assert!(names.len() > 0);
}

View File

@ -1 +0,0 @@
//! this crate exists only for its benchmarks